diff --git a/Makefile b/Makefile index 39364f6..58d4be2 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ release: make crossbuild GOOS=linux GOARCH=386 make crossbuild GOOS=darwin GOARCH=amd64 -install: install-pi install-imagine install-dx +install: install-pi install-imagine install-dx install-loader install-dx: go install -ldflags $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/dx @@ -53,6 +53,9 @@ install-imagine: install-pi: go install -ldflags $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/pi +install-loader: + go install -ldflags $(LDFLAGS) $(FLAGS) $(CLONE_URL)/cmd/loader + generate: enumer-install cd imagine && \ diff --git a/cmd/loader/main.go b/cmd/loader/main.go new file mode 100644 index 0000000..91c26a9 --- /dev/null +++ b/cmd/loader/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "log" + "os" + + "github.com/pilosa/tools/loader" +) + +func main() { + if err := loader.Run(os.Args, os.Stdin, os.Stdout, os.Stderr); err != nil { + log.Fatal(err) + } +} diff --git a/go.mod b/go.mod index 09af251..596ede5 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/jaffee/commandeer v0.1.0 github.com/kr/pty v1.1.8 // indirect github.com/miekg/dns v1.1.15 // indirect + github.com/molecula/apophenia v0.0.0-20190827192002-68b7a14a478b github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/pilosa/go-pilosa v1.3.1-0.20190715210601-8606626b90d6 github.com/pilosa/pilosa v1.3.1 diff --git a/go.sum b/go.sum index d5de1de..6d3f703 100644 --- a/go.sum +++ b/go.sum @@ -185,6 +185,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/molecula/apophenia v0.0.0-20190827192002-68b7a14a478b h1:cZADDaNYM7xn/nklO3g198JerGQjadFuA0ofxBJgK0Y= +github.com/molecula/apophenia v0.0.0-20190827192002-68b7a14a478b/go.mod h1:uXd1BiH7xLmgkhVmspdJLENv6uGWrTL/MQX2TN7Yz9s= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= diff --git a/loader/README.md b/loader/README.md new file mode 100644 index 0000000..0a972a2 --- /dev/null +++ b/loader/README.md @@ -0,0 +1,48 @@ +# Loader + +Loader is a very simple tool used to create load on a pilosa instance. It has a simple `toml` config to create tasks. It currently only reports success/error for calls. It purposely doesn't track stats. Stats should be monitored from the pilosa stats endpoints, such as the `/metrics` endpoint that feed prometheus, etc. + +## Config + +A sample config would look like this: + +```toml +[[Tasks]] + Connections = 10 + Delay = "10ms" + Query = "TopN(model, n=5)" + URL = "http://localhost:10101/index/equipment/query" + +[[Tasks]] + Connections = 1 + Delay = "1s" + Query = "TopN(model, n=5)" + URL = "http://localhost:10101/index/equipment/query" +``` + +You can have as many tasks as you want, but you need at least one for the program to create any load. + +### Definitions + +| Variable | Definition | +| --- | --- | +| Connections | Number of connections for this task to use. If you specify `10`, it means that it will create `10` actual connections for this task | +| Delay | This is the amount of time between each request per connection. If `1s` is specified, then it will delay for `1s` between each request | +| Query | The desired query to execute | +| URL | The query endoint to post the query to | + +The program will try to stagger the requests per task if more than one connection is specified. For example, if you specify `10` connections, and a delay of `10ms`, then it will fire one request every `1ms`, for a total of `10` requests every `10ms`. + +## Usage + +To get a template for a new config, you can issue the following command: + +```sh +loader config +``` + +To run the program with a specific config: + +```sh +loader -config ./loader/sample.config.toml +``` diff --git a/loader/config.go b/loader/config.go new file mode 100644 index 0000000..e476ffb --- /dev/null +++ b/loader/config.go @@ -0,0 +1,66 @@ +package loader + +import ( + "io" + "time" + + "github.com/BurntSushi/toml" +) + +// the toml package does not handle durations properly, so we have to create special handling for them. + +type duration time.Duration + +func (d *duration) UnmarshalText(text []byte) error { + dr, err := time.ParseDuration(string(text)) + if err != nil { + return err + } + *d = duration(dr) + return nil +} + +func (d duration) MarshalText() (text []byte, err error) { + return []byte(time.Duration(d).String()), nil +} + +func DemoConfig() *Config { + return &Config{ + Tasks: Tasks{ + { + Connections: 10, + Delay: duration(time.Millisecond * 10), + URL: "http://localhost:10101/index/equipment/query", + Query: "TopN(model, n=5)", + }, + { + Connections: 1, + Delay: duration(time.Second * 1), + URL: "http://localhost:10101/index/equipment/query", + Query: "TopN(model, n=5)", + }, + }, + } +} + +type Tasks []Task + +type Task struct { + Connections int + Delay duration + Query string + URL string +} + +type Config struct { + Tasks Tasks +} + +func (c *Config) Load(path string) error { + _, err := toml.DecodeFile(path, c) + return err +} + +func (c *Config) Print(w io.Writer) error { + return toml.NewEncoder(w).Encode(c) +} diff --git a/loader/loader.go b/loader/loader.go new file mode 100644 index 0000000..22ea993 --- /dev/null +++ b/loader/loader.go @@ -0,0 +1,198 @@ +package loader + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/pilosa/tools" +) + +func Run(args []string, stdin io.Reader, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet(args[0], flag.ExitOnError) + var config = flags.String("config", "", "specify a toml config file") + + if err := flags.Parse(args[1:]); err != nil { + return err + } + + cfg := &Config{} + if *config != "" { + fmt.Fprintf(stdout, "loading config from %s\n", *config) + if err := cfg.Load(*config); err != nil { + return fmt.Errorf("error loading config %q: %w", *config, err) + } + } + + var command string + commands := flags.Args() + if len(commands) == 1 { + command = commands[0] + } + + l := &loader{ + stderr: stderr, + stdout: stdout, + stats: newStats(), + } + + l.Println(`Version: ` + tools.Version + `+ Build Time: ` + tools.BuildTime + "\n") + + switch command { + case "config": + return l.printConfig(cfg) + default: + return l.load(cfg) + } +} + +type loader struct { + stderr io.Writer + stdout io.Writer + stats *stats +} + +func (l *loader) Println(args ...interface{}) { + fmt.Fprintln(l.stderr, args...) +} +func (l *loader) Printf(f string, args ...interface{}) { + fmt.Fprintf(l.stderr, f, args...) +} + +func (l *loader) printConfig(cfg *Config) error { + if len(cfg.Tasks) == 0 { + l.Println("no config provided, printing a sample config\n") + cfg = DemoConfig() + } + return cfg.Print(l.stdout) +} + +func (l *loader) load(cfg *Config) error { + if len(cfg.Tasks) == 0 { + return errors.New("No tasks found in config") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create a channel to block everyone on so that all tasks are spun up and ready to go before loading + start := make(chan struct{}) + + // create a wg to coordinate all tasks + var wg = &sync.WaitGroup{} + wg.Add(len(cfg.Tasks)) + + for _, t := range cfg.Tasks { + go l.launchTask(ctx, t, start, wg) + } + // let all tasks start + close(start) + + // start monitor loop + go l.printStats(ctx) + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Block until one of the signals above is received + <-signalCh + l.Println("Signal received, initializing clean shutdown...") + cancel() + + finished := make(chan struct{}) + go func(finished chan struct{}) { + wg.Wait() + close(finished) + }(finished) + + // Block again until another signal is received, a shutdown timeout elapses, + // or the Command is gracefully closed + l.Println("Waiting for clean shutdown...") + select { + case <-signalCh: + return errors.New("second signal received, initializing hard shutdown") + case <-time.After(time.Second * 30): + return errors.New("time limit reached, initializing hard shutdown") + case <-finished: + l.Println("shutdown completed") + return nil + } +} + +func (l *loader) launchTask(ctx context.Context, t Task, start chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + // calculate time offset between connections + offset := time.Duration(int64(time.Duration(t.Delay)) / int64(t.Connections)) + + var cwg = &sync.WaitGroup{} + cwg.Add(t.Connections) + for i := 0; i < t.Connections; i++ { + go l.childTask(ctx, cwg, start, t, offset*time.Duration(i)) + } + + <-ctx.Done() + + cwg.Wait() +} + +func (l *loader) childTask(ctx context.Context, wg *sync.WaitGroup, start chan struct{}, t Task, offset time.Duration) { + defer wg.Done() + + client := http.Client{} + + req, _ := http.NewRequest("POST", t.URL, bytes.NewBufferString(t.Query)) + + proc := func() { + _, err := client.Do(req) + if err != nil { + l.stats.error(t.Query) + } + l.stats.success(t.Query) + } + + // wait to start + <-start + // don't start all children at the same time, try to keep them spaced apart + time.Sleep(offset) + proc() + + // begin interval loop + tick := time.NewTicker(time.Duration(t.Delay)) + defer tick.Stop() + + for { + select { + case <-tick.C: + proc() + case <-ctx.Done(): + return + } + } +} + +func (l *loader) printStats(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + stats := l.stats.counts() + for k, v := range stats { + l.Printf("%s\t success: %d\t errors: %d\n", k, v[0], v[1]) + } + } + } +} diff --git a/loader/sample.config.toml b/loader/sample.config.toml new file mode 100644 index 0000000..140ac8c --- /dev/null +++ b/loader/sample.config.toml @@ -0,0 +1,11 @@ +[[Tasks]] + Connections = 10 + Delay = "1s" + Query = "TopN(model, n=5)" + URL = "http://localhost:10101/index/equipment/query" + +[[Tasks]] + Connections = 10 + Delay = "1s" + Query = "TopN(model, n=5)" + URL = "http://localhost:10101/index/equipment/query" diff --git a/loader/stats.go b/loader/stats.go new file mode 100644 index 0000000..2c4ea41 --- /dev/null +++ b/loader/stats.go @@ -0,0 +1,62 @@ +package loader + +import "sync" + +type stat struct { + m map[string]int + mu sync.RWMutex +} + +func (s *stat) inc(k string) { + s.mu.Lock() + defer s.mu.Unlock() + s.m[k]++ +} + +func newStat() *stat { + return &stat{ + m: make(map[string]int), + } +} + +type stats struct { + e *stat + s *stat +} + +func newStats() *stats { + return &stats{ + e: newStat(), + s: newStat(), + } +} + +func (s *stats) success(k string) { + s.s.inc(k) +} + +func (s *stats) error(k string) { + s.e.inc(k) +} + +func (s *stats) counts() map[string][2]int { + m := make(map[string][2]int) + + s.s.mu.RLock() + for k, v := range s.s.m { + a := m[k] + a[0] = v + m[k] = a + } + s.s.mu.RUnlock() + + s.e.mu.RLock() + for k, v := range s.e.m { + a := m[k] + a[0] = v + m[k] = a + + } + s.e.mu.RUnlock() + return m +}