From e608e9b79bbe841e3f70b629c5a8c1e8a2db8718 Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Mon, 14 Sep 2015 15:56:12 -0700 Subject: [PATCH] Add stress First pass at running stress test from a toml --- cmd/influx_stress/example.toml | 6 ++- cmd/influx_stress/influx_stress.go | 18 +++----- stress/config.go | 67 ++++++++++++++++++++---------- stress/runner.go | 39 ++++++++--------- 4 files changed, 77 insertions(+), 53 deletions(-) diff --git a/cmd/influx_stress/example.toml b/cmd/influx_stress/example.toml index 1f3bb145da..650f3f27f3 100644 --- a/cmd/influx_stress/example.toml +++ b/cmd/influx_stress/example.toml @@ -3,13 +3,17 @@ batch_size = 5000 batch_interval = "0s" database = "stress" + precision = "1s" + address = "localhost:8086" reset_database = true starting_point = "6m" # how far back in time to go ## Fine grained control over what series gets written [[series]] - point_count = 1000 # number of points that will be written for each of the series + point_count = 100 # number of points that will be written for each of the series measurement = "cpu" + # series count + generic_tagset_cardinality = 100000 # defaults to 0. needs to be power of 10. [[series.tag]] key = "host" diff --git a/cmd/influx_stress/influx_stress.go b/cmd/influx_stress/influx_stress.go index 9292fb02d9..d437848225 100644 --- a/cmd/influx_stress/influx_stress.go +++ b/cmd/influx_stress/influx_stress.go @@ -19,6 +19,7 @@ var ( database = flag.String("database", "stress", "name of database") address = flag.String("addr", "localhost:8086", "IP address and port of database (e.g., localhost:8086)") precision = flag.String("precision", "n", "The precision that points in the database will be with") + test = flag.String("test", "example.toml", "The stress test file") ) var ms runner.Measurements @@ -29,24 +30,17 @@ func init() { func main() { flag.Parse() + + cfg, err := runner.DecodeFile(*test) + + fmt.Println(err) + runtime.GOMAXPROCS(runtime.NumCPU()) if len(ms) == 0 { ms = append(ms, "cpu") } - cfg := &runner.Config{ - BatchSize: *batchSize, - Measurements: ms, - SeriesCount: *seriesCount, - PointCount: *pointCount, - Concurrency: *concurrency, - BatchInterval: *batchInterval, - Database: *database, - Address: *address, - Precision: *precision, - } - totalPoints, failedRequests, responseTimes, timer := runner.Run(cfg) sort.Sort(sort.Reverse(sort.Interface(responseTimes))) diff --git a/stress/config.go b/stress/config.go index b2dfa736c6..e4bd98faf6 100644 --- a/stress/config.go +++ b/stress/config.go @@ -1,10 +1,10 @@ -package main +package runner import ( "fmt" - "time" "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/client" ) type tag struct { @@ -24,37 +24,62 @@ type series struct { Tags []tag `toml:"series.tag"` Fields []field `toml:"series.field"` } +type seriesIter struct { + s *series + count int +} + +// iterates through the point +func (s *series) Iter() *seriesIter { + return &seriesIter{s: s, count: 0} +} + +// makes one point for each of its series +// iterates through all of the series +func (iter *seriesIter) Next() (client.Point, bool) { + if iter.count > iter.s.GenericTagsetCardinality { + return client.Point{}, false + } + p := client.Point{ + Measurement: iter.s.Measurement, + Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", iter.count)}, + Fields: map[string]interface{}{"value": 1.0}, + } + iter.count++ + return p, true +} type write struct { - Concurrency int `toml:"concurrency"` - BatchSize int `toml:"batch_size"` - BatchInterval time.Duration `toml:"batch_interval"` - Database string `toml:"database"` - ResetDatabase bool `toml:"reset_database"` - StartingPoint time.Duration `toml:"starting_time"` + Concurrency int `toml:"concurrency"` + BatchSize int `toml:"batch_size"` + BatchInterval string `toml:"batch_interval"` + Database string `toml:"database"` + ResetDatabase bool `toml:"reset_database"` + StartingPoint string `toml:"starting_time"` + Address string `toml:"address"` + Precision string `toml:"precision"` } type query struct { - Concurrency int `toml:"concurrency"` - Measurement string `toml:"measurement"` - TagKey string `toml:"tag_key"` - TimeFrame time.Duration `toml:"time_frame"` - Statement string `toml:"statement"` + Concurrency int `toml:"concurrency"` + Measurement string `toml:"measurement"` + TagKey string `toml:"tag_key"` + TimeFrame string `toml:"time_frame"` + Statement string `toml:"statement"` } -type Test struct { +type StressTest struct { Write write `toml:"write"` Series []series `toml:"series"` Queries []query `toml:"query"` } -func main() { - t := &Test{} - if _, err := toml.DecodeFile("example.toml", t); err != nil { - fmt.Println(err) - return +func DecodeFile(s string) (*StressTest, error) { + t := &StressTest{} + + if _, err := toml.DecodeFile(s, t); err != nil { + return nil, err } - fmt.Println(t) - + return t, nil } diff --git a/stress/runner.go b/stress/runner.go index f098c1bdc8..ff3acab3c9 100644 --- a/stress/runner.go +++ b/stress/runner.go @@ -2,7 +2,7 @@ package runner import ( "fmt" - "math/rand" + //"math/rand" "net/url" "strings" "sync" @@ -120,8 +120,9 @@ type Config struct { // newClient returns a pointer to an InfluxDB client for // a `Config`'s `Address` field. If an error is encountered // when creating a new client, the function panics. -func (cfg *Config) NewClient() (*client.Client, error) { - u, _ := url.Parse(fmt.Sprintf("http://%s", cfg.Address)) +func (cfg *StressTest) NewClient() (*client.Client, error) { + fmt.Printf("CFG: %#v\n", cfg) + u, _ := url.Parse(fmt.Sprintf("http://%s", cfg.Write.Address)) c, err := client.NewClient(client.Config{URL: *u}) if err != nil { return nil, err @@ -133,7 +134,8 @@ func (cfg *Config) NewClient() (*client.Client, error) { // It returns the total number of points that were during the test, // an slice of all of the stress tests response times, // and the times that the test started at and ended as a `Timer` -func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes ResponseTimes, timer *Timer) { +func Run(cfg *StressTest) (totalPoints int, failedRequests int, responseTimes ResponseTimes, timer *Timer) { + timer = NewTimer() defer timer.StopTimer() @@ -142,7 +144,7 @@ func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes Respon panic(err) } - counter := NewConcurrencyLimiter(cfg.Concurrency) + counter := NewConcurrencyLimiter(cfg.Write.Concurrency) var mu sync.Mutex var wg sync.WaitGroup @@ -155,25 +157,23 @@ func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes Respon lastSuccess := true batch := &client.BatchPoints{ - Database: cfg.Database, + Database: cfg.Write.Database, WriteConsistency: "any", Time: time.Now(), - Precision: cfg.Precision, + Precision: cfg.Write.Precision, // Should be cfg.Write.Precision } - for i := 1; i <= cfg.PointCount; i++ { - for j := 1; j <= cfg.SeriesCount; j++ { - for _, m := range cfg.Measurements { - p := client.Point{ - Measurement: m, - Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)}, - Fields: map[string]interface{}{"value": rand.Float64()}, - } + for _, testSeries := range cfg.Series { + for i := 0; i < testSeries.PointCount; i++ { + iter := testSeries.Iter() + var p client.Point + for ok := true; ok; p, ok = iter.Next() { batch.Points = append(batch.Points, p) - if len(batch.Points) >= cfg.BatchSize { + if len(batch.Points) >= cfg.Write.BatchSize { wg.Add(1) counter.Increment() totalPoints += len(batch.Points) + go func(b *client.BatchPoints, total int) { st := time.Now() if _, err := c.Write(*b); err != nil { @@ -194,16 +194,17 @@ func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes Respon responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds()))) mu.Unlock() } - time.Sleep(cfg.BatchInterval) + //time.Sleep(cfg.Write.BatchInterval) + time.Sleep(0 * time.Second) wg.Done() counter.Decrement() if total%500000 == 0 { - fmt.Printf("%d total points. %d in %s\n", total, cfg.BatchSize, time.Since(st)) + fmt.Printf("%d total points. %d in %s\n", total, cfg.Write.BatchSize, time.Since(st)) } }(batch, totalPoints) batch = &client.BatchPoints{ - Database: cfg.Database, + Database: cfg.Write.Database, WriteConsistency: "any", Precision: "n", Time: time.Now(),