Add a configurable `Measurements` type

Previously the measurement that was getting written into InfluxDB was
hard coded in the `Run` method as `cpu`. Now you can specify a
measurements by passing an `-m` flag to `influx_stress`.

The `-m` flag accepts a comma separated list of measurements. (e.g.
`influx_stress -m cpu,mem,disk`)
pull/3798/head
Michael Desa 2015-08-22 05:08:32 -07:00
parent d63ac6a842
commit 2b0b66d7d6
3 changed files with 105 additions and 35 deletions

View File

@ -20,12 +20,23 @@ var (
address = flag.String("addr", "localhost:8086", "IP address and port of database (e.g., localhost:8086)") address = flag.String("addr", "localhost:8086", "IP address and port of database (e.g., localhost:8086)")
) )
var ms runner.Measurements
func init() {
flag.Var(&ms, "m", "comma-separated list of intervals to use between events")
}
func main() { func main() {
flag.Parse() flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
if len(ms) == 0 {
ms = append(ms, "cpu")
}
cfg := &runner.Config{ cfg := &runner.Config{
BatchSize: *batchSize, BatchSize: *batchSize,
Measurements: ms,
SeriesCount: *seriesCount, SeriesCount: *seriesCount,
PointCount: *pointCount, PointCount: *pointCount,
Concurrency: *concurrency, Concurrency: *concurrency,

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net/url" "net/url"
"strings"
"sync" "sync"
"time" "time"
@ -84,9 +85,29 @@ func (rs ResponseTimes) Swap(i, j int) {
rs[i], rs[j] = rs[j], rs[i] rs[i], rs[j] = rs[j], rs[i]
} }
type Measurements []string
// String returns a string and implements the `String` method for
// the flag.Value interface.
func (ms *Measurements) String() string {
return fmt.Sprint(*ms)
}
// Set implements the `Set` method for the flag.Value
// interface. Set splits a string of comma separated values
// into a `Measurement`.
func (ms *Measurements) Set(value string) error {
values := strings.Split(value, ",")
for _, m := range values {
*ms = append(*ms, m)
}
return nil
}
// Config is a struct that is passed into the `Run()` function. // Config is a struct that is passed into the `Run()` function.
type Config struct { type Config struct {
BatchSize int BatchSize int
Measurements Measurements
SeriesCount int SeriesCount int
PointCount int PointCount int
Concurrency int Concurrency int
@ -134,39 +155,42 @@ func Run(cfg *Config) (totalPoints int, responseTimes ResponseTimes, timer *Time
Time: time.Now(), Time: time.Now(),
Precision: "n", Precision: "n",
} }
for i := 1; i <= cfg.PointCount; i++ { for i := 1; i <= cfg.PointCount; i++ {
for j := 1; j <= cfg.SeriesCount; j++ { for j := 1; j <= cfg.SeriesCount; j++ {
p := client.Point{ for _, m := range cfg.Measurements {
Measurement: "cpu", p := client.Point{
Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)}, Measurement: m,
Fields: map[string]interface{}{"value": rand.Float64()}, Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)},
} Fields: map[string]interface{}{"value": rand.Float64()},
batch.Points = append(batch.Points, p) }
if len(batch.Points) >= cfg.BatchSize { batch.Points = append(batch.Points, p)
wg.Add(1) if len(batch.Points) >= cfg.BatchSize {
counter.Increment() wg.Add(1)
totalPoints += len(batch.Points) counter.Increment()
go func(b *client.BatchPoints, total int) { totalPoints += len(batch.Points)
st := time.Now() go func(b *client.BatchPoints, total int) {
if _, err := c.Write(*b); err != nil { st := time.Now()
fmt.Println("ERROR: ", err.Error()) if _, err := c.Write(*b); err != nil {
} else { fmt.Println("ERROR: ", err.Error())
mu.Lock() } else {
responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds()))) mu.Lock()
mu.Unlock() responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds())))
} mu.Unlock()
wg.Done() }
counter.Decrement() wg.Done()
if total%500000 == 0 { counter.Decrement()
fmt.Printf("%d total points. %d in %s\n", total, cfg.BatchSize, time.Since(st)) if total%500000 == 0 {
} fmt.Printf("%d total points. %d in %s\n", total, cfg.BatchSize, time.Since(st))
}(batch, totalPoints) }
}(batch, totalPoints)
batch = &client.BatchPoints{ batch = &client.BatchPoints{
Database: cfg.Database, Database: cfg.Database,
WriteConsistency: "any", WriteConsistency: "any",
Precision: "n", Precision: "n",
Time: time.Now(), Time: time.Now(),
}
} }
} }
} }

View File

@ -86,6 +86,31 @@ func TestNewResponseTime(t *testing.T) {
} }
func TestMeasurments_Set(t *testing.T) {
ms := make(runner.Measurements, 0)
ms.Set("this,is,a,test")
if ms[0] != "this" {
t.Errorf("expected value to be %v, got %v", "this", ms[0])
}
if ms[1] != "is" {
t.Errorf("expected value to be %v, got %v", "is", ms[1])
}
ms.Set("more,here")
if ms[4] != "more" {
t.Errorf("expected value to be %v, got %v", "more", ms[4])
}
if len(ms) != 6 {
t.Errorf("expected the length of ms to be %v, got %v", 6, len(ms))
}
}
func TestConfig_newClient(t *testing.T) { func TestConfig_newClient(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Influxdb-Version", "x.x") w.Header().Set("X-Influxdb-Version", "x.x")
@ -93,10 +118,14 @@ func TestConfig_newClient(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
ms := make(runner.Measurements, 0)
ms.Set("this,is,a,test")
url := ts.URL[7:] url := ts.URL[7:]
cfg := &runner.Config{ cfg := &runner.Config{
BatchSize: 5000, BatchSize: 5000,
Measurements: ms,
SeriesCount: 10000, SeriesCount: 10000,
PointCount: 100, PointCount: 100,
Concurrency: 10, Concurrency: 10,
@ -139,12 +168,16 @@ func TestRun(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
ms := make(runner.Measurements, 0)
ms.Set("this,is,a,test")
url := ts.URL[7:] url := ts.URL[7:]
cfg := &runner.Config{ cfg := &runner.Config{
BatchSize: 5000, BatchSize: 5000,
Measurements: ms,
SeriesCount: 10000, SeriesCount: 10000,
PointCount: 100, PointCount: 10,
Concurrency: 10, Concurrency: 10,
BatchInterval: time.Duration(0), BatchInterval: time.Duration(0),
Database: "stress", Database: "stress",
@ -153,12 +186,14 @@ func TestRun(t *testing.T) {
tp, rts, tmr := runner.Run(cfg) tp, rts, tmr := runner.Run(cfg)
if tp != cfg.SeriesCount*cfg.PointCount { ps := cfg.SeriesCount * cfg.PointCount * len(cfg.Measurements)
t.Fatalf("unexpected error. expected %v, actual %v", 1000000, tp)
if tp != ps {
t.Fatalf("unexpected error. expected %v, actual %v", ps, tp)
} }
if len(rts) != cfg.SeriesCount*cfg.PointCount/cfg.BatchSize { if len(rts) != ps/cfg.BatchSize {
t.Fatalf("unexpected error. expected %v, actual %v", 1000000, len(rts)) t.Fatalf("unexpected error. expected %v, actual %v", ps/cfg.BatchSize, len(rts))
} }
var epoch time.Time var epoch time.Time