Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add basic tool to put load on pilosa #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ release:
make crossbuild GOOS=linux GOARCH=386
make crossbuild GOOS=darwin GOARCH=amd64

install: install-pi install-imagine install-dx
install: install-pi install-imagine install-dx install-loader

install-dx:
go install -ldflags $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/dx
Expand All @@ -53,6 +53,9 @@ install-imagine:
install-pi:
go install -ldflags $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/pi

install-loader:
go install -ldflags $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/loader


generate: enumer-install
cd imagine && \
Expand Down
14 changes: 14 additions & 0 deletions cmd/loader/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"log"
"os"

"github.com/pilosa/tools/loader"
)

func main() {
if err := loader.Run(os.Args, os.Stdin, os.Stdout, os.Stderr); err != nil {
log.Fatal(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/jaffee/commandeer v0.1.0
github.com/kr/pty v1.1.8 // indirect
github.com/miekg/dns v1.1.15 // indirect
github.com/molecula/apophenia v0.0.0-20190827192002-68b7a14a478b
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/pilosa/go-pilosa v1.3.1-0.20190715210601-8606626b90d6
github.com/pilosa/pilosa v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/molecula/apophenia v0.0.0-20190827192002-68b7a14a478b h1:cZADDaNYM7xn/nklO3g198JerGQjadFuA0ofxBJgK0Y=
github.com/molecula/apophenia v0.0.0-20190827192002-68b7a14a478b/go.mod h1:uXd1BiH7xLmgkhVmspdJLENv6uGWrTL/MQX2TN7Yz9s=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
Expand Down
48 changes: 48 additions & 0 deletions loader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Loader

Loader is a very simple tool used to create load on a pilosa instance. It has a simple `toml` config to create tasks. It currently only reports success/error for calls. It purposely doesn't track stats. Stats should be monitored from the pilosa stats endpoints, such as the `/metrics` endpoint that feed prometheus, etc.

## Config

A sample config would look like this:

```toml
[[Tasks]]
Connections = 10
Delay = "10ms"
Query = "TopN(model, n=5)"
URL = "http://localhost:10101/index/equipment/query"

[[Tasks]]
Connections = 1
Delay = "1s"
Query = "TopN(model, n=5)"
URL = "http://localhost:10101/index/equipment/query"
```

You can have as many tasks as you want, but you need at least one for the program to create any load.

### Definitions

| Variable | Definition |
| --- | --- |
| Connections | Number of connections for this task to use. If you specify `10`, it means that it will create `10` actual connections for this task |
| Delay | This is the amount of time between each request per connection. If `1s` is specified, then it will delay for `1s` between each request |
| Query | The desired query to execute |
| URL | The query endoint to post the query to |

The program will try to stagger the requests per task if more than one connection is specified. For example, if you specify `10` connections, and a delay of `10ms`, then it will fire one request every `1ms`, for a total of `10` requests every `10ms`.

## Usage

To get a template for a new config, you can issue the following command:

```sh
loader config
```

To run the program with a specific config:

```sh
loader -config ./loader/sample.config.toml
```
66 changes: 66 additions & 0 deletions loader/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package loader

import (
"io"
"time"

"github.com/BurntSushi/toml"
)

// the toml package does not handle durations properly, so we have to create special handling for them.

type duration time.Duration

func (d *duration) UnmarshalText(text []byte) error {
dr, err := time.ParseDuration(string(text))
if err != nil {
return err
}
*d = duration(dr)
return nil
}

func (d duration) MarshalText() (text []byte, err error) {
return []byte(time.Duration(d).String()), nil
}

func DemoConfig() *Config {
return &Config{
Tasks: Tasks{
{
Connections: 10,
Delay: duration(time.Millisecond * 10),
URL: "http://localhost:10101/index/equipment/query",
Query: "TopN(model, n=5)",
},
{
Connections: 1,
Delay: duration(time.Second * 1),
URL: "http://localhost:10101/index/equipment/query",
Query: "TopN(model, n=5)",
},
},
}
}

type Tasks []Task

type Task struct {
Connections int
Delay duration
Query string
URL string
}

type Config struct {
Tasks Tasks
}

func (c *Config) Load(path string) error {
_, err := toml.DecodeFile(path, c)
return err
}

func (c *Config) Print(w io.Writer) error {
return toml.NewEncoder(w).Encode(c)
}
198 changes: 198 additions & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package loader

import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/pilosa/tools"
)

func Run(args []string, stdin io.Reader, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet(args[0], flag.ExitOnError)
var config = flags.String("config", "", "specify a toml config file")

if err := flags.Parse(args[1:]); err != nil {
return err
}

cfg := &Config{}
if *config != "" {
fmt.Fprintf(stdout, "loading config from %s\n", *config)
if err := cfg.Load(*config); err != nil {
return fmt.Errorf("error loading config %q: %w", *config, err)
}
}

var command string
commands := flags.Args()
if len(commands) == 1 {
command = commands[0]
}

l := &loader{
stderr: stderr,
stdout: stdout,
stats: newStats(),
}

l.Println(`Version: ` + tools.Version + `+ Build Time: ` + tools.BuildTime + "\n")

switch command {
case "config":
return l.printConfig(cfg)
default:
return l.load(cfg)
}
}

type loader struct {
stderr io.Writer
stdout io.Writer
stats *stats
}

func (l *loader) Println(args ...interface{}) {
fmt.Fprintln(l.stderr, args...)
}
func (l *loader) Printf(f string, args ...interface{}) {
fmt.Fprintf(l.stderr, f, args...)
}

func (l *loader) printConfig(cfg *Config) error {
if len(cfg.Tasks) == 0 {
l.Println("no config provided, printing a sample config\n")
cfg = DemoConfig()
}
return cfg.Print(l.stdout)
}

func (l *loader) load(cfg *Config) error {
if len(cfg.Tasks) == 0 {
return errors.New("No tasks found in config")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create a channel to block everyone on so that all tasks are spun up and ready to go before loading
start := make(chan struct{})

// create a wg to coordinate all tasks
var wg = &sync.WaitGroup{}
wg.Add(len(cfg.Tasks))

for _, t := range cfg.Tasks {
go l.launchTask(ctx, t, start, wg)
}
// let all tasks start
close(start)

// start monitor loop
go l.printStats(ctx)

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

// Block until one of the signals above is received
<-signalCh
l.Println("Signal received, initializing clean shutdown...")
cancel()

finished := make(chan struct{})
go func(finished chan struct{}) {
wg.Wait()
close(finished)
}(finished)

// Block again until another signal is received, a shutdown timeout elapses,
// or the Command is gracefully closed
l.Println("Waiting for clean shutdown...")
select {
case <-signalCh:
return errors.New("second signal received, initializing hard shutdown")
case <-time.After(time.Second * 30):
return errors.New("time limit reached, initializing hard shutdown")
case <-finished:
l.Println("shutdown completed")
return nil
}
}

func (l *loader) launchTask(ctx context.Context, t Task, start chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

// calculate time offset between connections
offset := time.Duration(int64(time.Duration(t.Delay)) / int64(t.Connections))

var cwg = &sync.WaitGroup{}
cwg.Add(t.Connections)
for i := 0; i < t.Connections; i++ {
go l.childTask(ctx, cwg, start, t, offset*time.Duration(i))
}

<-ctx.Done()

cwg.Wait()
}

func (l *loader) childTask(ctx context.Context, wg *sync.WaitGroup, start chan struct{}, t Task, offset time.Duration) {
defer wg.Done()

client := http.Client{}

req, _ := http.NewRequest("POST", t.URL, bytes.NewBufferString(t.Query))

proc := func() {
_, err := client.Do(req)
if err != nil {
l.stats.error(t.Query)
}
l.stats.success(t.Query)
}

// wait to start
<-start
// don't start all children at the same time, try to keep them spaced apart
time.Sleep(offset)
proc()

// begin interval loop
tick := time.NewTicker(time.Duration(t.Delay))
defer tick.Stop()

for {
select {
case <-tick.C:
proc()
case <-ctx.Done():
return
}
}
}

func (l *loader) printStats(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := l.stats.counts()
for k, v := range stats {
l.Printf("%s\t success: %d\t errors: %d\n", k, v[0], v[1])
}
}
}
}
11 changes: 11 additions & 0 deletions loader/sample.config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[[Tasks]]
Connections = 10
Delay = "1s"
Query = "TopN(model, n=5)"
URL = "http://localhost:10101/index/equipment/query"

[[Tasks]]
Connections = 10
Delay = "1s"
Query = "TopN(model, n=5)"
URL = "http://localhost:10101/index/equipment/query"
Loading