From 2b0b66d7d6aafc157a3b94fcb3b593f8fd7c48d4 Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Sat, 22 Aug 2015 05:08:32 -0700 Subject: [PATCH] Add a configurable `Measurements` type Previously the measurement that was getting written into InfluxDB was hard coded in the `Run` method as `cpu`. Now you can specify a measurements by passing an `-m` flag to `influx_stress`. The `-m` flag accepts a comma separated list of measurements. (e.g. `influx_stress -m cpu,mem,disk`) --- cmd/influx_stress/influx_stress.go | 11 ++++ cmd/influx_stress/runner/runner.go | 84 ++++++++++++++++--------- cmd/influx_stress/runner/runner_test.go | 45 +++++++++++-- 3 files changed, 105 insertions(+), 35 deletions(-) diff --git a/cmd/influx_stress/influx_stress.go b/cmd/influx_stress/influx_stress.go index 2c0ea8d7a5..5bb8f8de23 100644 --- a/cmd/influx_stress/influx_stress.go +++ b/cmd/influx_stress/influx_stress.go @@ -20,12 +20,23 @@ var ( address = flag.String("addr", "localhost:8086", "IP address and port of database (e.g., localhost:8086)") ) +var ms runner.Measurements + +func init() { + flag.Var(&ms, "m", "comma-separated list of intervals to use between events") +} + func main() { flag.Parse() runtime.GOMAXPROCS(runtime.NumCPU()) + if len(ms) == 0 { + ms = append(ms, "cpu") + } + cfg := &runner.Config{ BatchSize: *batchSize, + Measurements: ms, SeriesCount: *seriesCount, PointCount: *pointCount, Concurrency: *concurrency, diff --git a/cmd/influx_stress/runner/runner.go b/cmd/influx_stress/runner/runner.go index 7075f93641..0f37e743e3 100644 --- a/cmd/influx_stress/runner/runner.go +++ b/cmd/influx_stress/runner/runner.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "net/url" + "strings" "sync" "time" @@ -84,9 +85,29 @@ func (rs ResponseTimes) Swap(i, j int) { rs[i], rs[j] = rs[j], rs[i] } +type Measurements []string + +// String returns a string and implements the `String` method for +// the flag.Value interface. +func (ms *Measurements) String() string { + return fmt.Sprint(*ms) +} + +// Set implements the `Set` method for the flag.Value +// interface. Set splits a string of comma separated values +// into a `Measurement`. +func (ms *Measurements) Set(value string) error { + values := strings.Split(value, ",") + for _, m := range values { + *ms = append(*ms, m) + } + return nil +} + // Config is a struct that is passed into the `Run()` function. type Config struct { BatchSize int + Measurements Measurements SeriesCount int PointCount int Concurrency int @@ -134,39 +155,42 @@ func Run(cfg *Config) (totalPoints int, responseTimes ResponseTimes, timer *Time Time: time.Now(), Precision: "n", } + for i := 1; i <= cfg.PointCount; i++ { for j := 1; j <= cfg.SeriesCount; j++ { - p := client.Point{ - Measurement: "cpu", - Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)}, - Fields: map[string]interface{}{"value": rand.Float64()}, - } - batch.Points = append(batch.Points, p) - if len(batch.Points) >= cfg.BatchSize { - wg.Add(1) - counter.Increment() - totalPoints += len(batch.Points) - go func(b *client.BatchPoints, total int) { - st := time.Now() - if _, err := c.Write(*b); err != nil { - fmt.Println("ERROR: ", err.Error()) - } else { - mu.Lock() - responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds()))) - mu.Unlock() - } - wg.Done() - counter.Decrement() - if total%500000 == 0 { - fmt.Printf("%d total points. %d in %s\n", total, cfg.BatchSize, time.Since(st)) - } - }(batch, totalPoints) + for _, m := range cfg.Measurements { + p := client.Point{ + Measurement: m, + Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)}, + Fields: map[string]interface{}{"value": rand.Float64()}, + } + batch.Points = append(batch.Points, p) + if len(batch.Points) >= cfg.BatchSize { + wg.Add(1) + counter.Increment() + totalPoints += len(batch.Points) + go func(b *client.BatchPoints, total int) { + st := time.Now() + if _, err := c.Write(*b); err != nil { + fmt.Println("ERROR: ", err.Error()) + } else { + mu.Lock() + responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds()))) + mu.Unlock() + } + wg.Done() + counter.Decrement() + if total%500000 == 0 { + fmt.Printf("%d total points. %d in %s\n", total, cfg.BatchSize, time.Since(st)) + } + }(batch, totalPoints) - batch = &client.BatchPoints{ - Database: cfg.Database, - WriteConsistency: "any", - Precision: "n", - Time: time.Now(), + batch = &client.BatchPoints{ + Database: cfg.Database, + WriteConsistency: "any", + Precision: "n", + Time: time.Now(), + } } } } diff --git a/cmd/influx_stress/runner/runner_test.go b/cmd/influx_stress/runner/runner_test.go index c8a36fb7c6..2a530af89e 100644 --- a/cmd/influx_stress/runner/runner_test.go +++ b/cmd/influx_stress/runner/runner_test.go @@ -86,6 +86,31 @@ func TestNewResponseTime(t *testing.T) { } +func TestMeasurments_Set(t *testing.T) { + ms := make(runner.Measurements, 0) + + ms.Set("this,is,a,test") + + if ms[0] != "this" { + t.Errorf("expected value to be %v, got %v", "this", ms[0]) + } + + if ms[1] != "is" { + t.Errorf("expected value to be %v, got %v", "is", ms[1]) + } + + ms.Set("more,here") + + if ms[4] != "more" { + t.Errorf("expected value to be %v, got %v", "more", ms[4]) + } + + if len(ms) != 6 { + t.Errorf("expected the length of ms to be %v, got %v", 6, len(ms)) + } + +} + func TestConfig_newClient(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Influxdb-Version", "x.x") @@ -93,10 +118,14 @@ func TestConfig_newClient(t *testing.T) { })) defer ts.Close() + ms := make(runner.Measurements, 0) + ms.Set("this,is,a,test") + url := ts.URL[7:] cfg := &runner.Config{ BatchSize: 5000, + Measurements: ms, SeriesCount: 10000, PointCount: 100, Concurrency: 10, @@ -139,12 +168,16 @@ func TestRun(t *testing.T) { })) defer ts.Close() + ms := make(runner.Measurements, 0) + ms.Set("this,is,a,test") + url := ts.URL[7:] cfg := &runner.Config{ BatchSize: 5000, + Measurements: ms, SeriesCount: 10000, - PointCount: 100, + PointCount: 10, Concurrency: 10, BatchInterval: time.Duration(0), Database: "stress", @@ -153,12 +186,14 @@ func TestRun(t *testing.T) { tp, rts, tmr := runner.Run(cfg) - if tp != cfg.SeriesCount*cfg.PointCount { - t.Fatalf("unexpected error. expected %v, actual %v", 1000000, tp) + ps := cfg.SeriesCount * cfg.PointCount * len(cfg.Measurements) + + if tp != ps { + t.Fatalf("unexpected error. expected %v, actual %v", ps, tp) } - if len(rts) != cfg.SeriesCount*cfg.PointCount/cfg.BatchSize { - t.Fatalf("unexpected error. expected %v, actual %v", 1000000, len(rts)) + if len(rts) != ps/cfg.BatchSize { + t.Fatalf("unexpected error. expected %v, actual %v", ps/cfg.BatchSize, len(rts)) } var epoch time.Time