Add stress

First pass at running stress test from a toml
pull/4168/head
Michael Desa 2015-09-14 15:56:12 -07:00
parent e43b60b8e1
commit e608e9b79b
4 changed files with 77 additions and 53 deletions

View File

@ -3,13 +3,17 @@
batch_size = 5000
batch_interval = "0s"
database = "stress"
precision = "1s"
address = "localhost:8086"
reset_database = true
starting_point = "6m" # how far back in time to go
## Fine grained control over what series gets written
[[series]]
point_count = 1000 # number of points that will be written for each of the series
point_count = 100 # number of points that will be written for each of the series
measurement = "cpu"
# series count
generic_tagset_cardinality = 100000 # defaults to 0. needs to be power of 10.
[[series.tag]]
key = "host"

View File

@ -19,6 +19,7 @@ var (
database = flag.String("database", "stress", "name of database")
address = flag.String("addr", "localhost:8086", "IP address and port of database (e.g., localhost:8086)")
precision = flag.String("precision", "n", "The precision that points in the database will be with")
test = flag.String("test", "example.toml", "The stress test file")
)
var ms runner.Measurements
@ -29,24 +30,17 @@ func init() {
func main() {
flag.Parse()
cfg, err := runner.DecodeFile(*test)
fmt.Println(err)
runtime.GOMAXPROCS(runtime.NumCPU())
if len(ms) == 0 {
ms = append(ms, "cpu")
}
cfg := &runner.Config{
BatchSize: *batchSize,
Measurements: ms,
SeriesCount: *seriesCount,
PointCount: *pointCount,
Concurrency: *concurrency,
BatchInterval: *batchInterval,
Database: *database,
Address: *address,
Precision: *precision,
}
totalPoints, failedRequests, responseTimes, timer := runner.Run(cfg)
sort.Sort(sort.Reverse(sort.Interface(responseTimes)))

View File

@ -1,10 +1,10 @@
package main
package runner
import (
"fmt"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/client"
)
type tag struct {
@ -24,37 +24,62 @@ type series struct {
Tags []tag `toml:"series.tag"`
Fields []field `toml:"series.field"`
}
type seriesIter struct {
s *series
count int
}
// iterates through the point
func (s *series) Iter() *seriesIter {
return &seriesIter{s: s, count: 0}
}
// makes one point for each of its series
// iterates through all of the series
func (iter *seriesIter) Next() (client.Point, bool) {
if iter.count > iter.s.GenericTagsetCardinality {
return client.Point{}, false
}
p := client.Point{
Measurement: iter.s.Measurement,
Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", iter.count)},
Fields: map[string]interface{}{"value": 1.0},
}
iter.count++
return p, true
}
type write struct {
Concurrency int `toml:"concurrency"`
BatchSize int `toml:"batch_size"`
BatchInterval time.Duration `toml:"batch_interval"`
Database string `toml:"database"`
ResetDatabase bool `toml:"reset_database"`
StartingPoint time.Duration `toml:"starting_time"`
Concurrency int `toml:"concurrency"`
BatchSize int `toml:"batch_size"`
BatchInterval string `toml:"batch_interval"`
Database string `toml:"database"`
ResetDatabase bool `toml:"reset_database"`
StartingPoint string `toml:"starting_time"`
Address string `toml:"address"`
Precision string `toml:"precision"`
}
type query struct {
Concurrency int `toml:"concurrency"`
Measurement string `toml:"measurement"`
TagKey string `toml:"tag_key"`
TimeFrame time.Duration `toml:"time_frame"`
Statement string `toml:"statement"`
Concurrency int `toml:"concurrency"`
Measurement string `toml:"measurement"`
TagKey string `toml:"tag_key"`
TimeFrame string `toml:"time_frame"`
Statement string `toml:"statement"`
}
type Test struct {
type StressTest struct {
Write write `toml:"write"`
Series []series `toml:"series"`
Queries []query `toml:"query"`
}
func main() {
t := &Test{}
if _, err := toml.DecodeFile("example.toml", t); err != nil {
fmt.Println(err)
return
func DecodeFile(s string) (*StressTest, error) {
t := &StressTest{}
if _, err := toml.DecodeFile(s, t); err != nil {
return nil, err
}
fmt.Println(t)
return t, nil
}

View File

@ -2,7 +2,7 @@ package runner
import (
"fmt"
"math/rand"
//"math/rand"
"net/url"
"strings"
"sync"
@ -120,8 +120,9 @@ type Config struct {
// newClient returns a pointer to an InfluxDB client for
// a `Config`'s `Address` field. If an error is encountered
// when creating a new client, the function panics.
func (cfg *Config) NewClient() (*client.Client, error) {
u, _ := url.Parse(fmt.Sprintf("http://%s", cfg.Address))
func (cfg *StressTest) NewClient() (*client.Client, error) {
fmt.Printf("CFG: %#v\n", cfg)
u, _ := url.Parse(fmt.Sprintf("http://%s", cfg.Write.Address))
c, err := client.NewClient(client.Config{URL: *u})
if err != nil {
return nil, err
@ -133,7 +134,8 @@ func (cfg *Config) NewClient() (*client.Client, error) {
// It returns the total number of points that were during the test,
// an slice of all of the stress tests response times,
// and the times that the test started at and ended as a `Timer`
func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes ResponseTimes, timer *Timer) {
func Run(cfg *StressTest) (totalPoints int, failedRequests int, responseTimes ResponseTimes, timer *Timer) {
timer = NewTimer()
defer timer.StopTimer()
@ -142,7 +144,7 @@ func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes Respon
panic(err)
}
counter := NewConcurrencyLimiter(cfg.Concurrency)
counter := NewConcurrencyLimiter(cfg.Write.Concurrency)
var mu sync.Mutex
var wg sync.WaitGroup
@ -155,25 +157,23 @@ func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes Respon
lastSuccess := true
batch := &client.BatchPoints{
Database: cfg.Database,
Database: cfg.Write.Database,
WriteConsistency: "any",
Time: time.Now(),
Precision: cfg.Precision,
Precision: cfg.Write.Precision, // Should be cfg.Write.Precision
}
for i := 1; i <= cfg.PointCount; i++ {
for j := 1; j <= cfg.SeriesCount; j++ {
for _, m := range cfg.Measurements {
p := client.Point{
Measurement: m,
Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)},
Fields: map[string]interface{}{"value": rand.Float64()},
}
for _, testSeries := range cfg.Series {
for i := 0; i < testSeries.PointCount; i++ {
iter := testSeries.Iter()
var p client.Point
for ok := true; ok; p, ok = iter.Next() {
batch.Points = append(batch.Points, p)
if len(batch.Points) >= cfg.BatchSize {
if len(batch.Points) >= cfg.Write.BatchSize {
wg.Add(1)
counter.Increment()
totalPoints += len(batch.Points)
go func(b *client.BatchPoints, total int) {
st := time.Now()
if _, err := c.Write(*b); err != nil {
@ -194,16 +194,17 @@ func Run(cfg *Config) (totalPoints int, failedRequests int, responseTimes Respon
responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds())))
mu.Unlock()
}
time.Sleep(cfg.BatchInterval)
//time.Sleep(cfg.Write.BatchInterval)
time.Sleep(0 * time.Second)
wg.Done()
counter.Decrement()
if total%500000 == 0 {
fmt.Printf("%d total points. %d in %s\n", total, cfg.BatchSize, time.Since(st))
fmt.Printf("%d total points. %d in %s\n", total, cfg.Write.BatchSize, time.Since(st))
}
}(batch, totalPoints)
batch = &client.BatchPoints{
Database: cfg.Database,
Database: cfg.Write.Database,
WriteConsistency: "any",
Precision: "n",
Time: time.Now(),