diff --git a/cmd/influx_stress/influx_stress.go b/cmd/influx_stress/influx_stress.go index 608a33dc83..6eaee05b76 100644 --- a/cmd/influx_stress/influx_stress.go +++ b/cmd/influx_stress/influx_stress.go @@ -3,125 +3,41 @@ package main import ( "flag" "fmt" - "runtime" - "sort" - "time" + "os" + "runtime/pprof" "github.com/influxdb/influxdb/stress" ) var ( - batchSize = flag.Int("batchsize", 0, "number of points per batch") - concurrency = flag.Int("concurrency", 0, "number of simultaneous writes to run") - batchInterval = flag.Duration("batchinterval", 0*time.Second, "duration between batches") - database = flag.String("database", "", "name of database") - address = flag.String("addr", "", "IP address and port of database (e.g., localhost:8086)") - precision = flag.String("precision", "", "The precision that points in the database will be with") - test = flag.String("test", "", "The stress test file") + //database = flag.String("database", "", "name of database") + //address = flag.String("addr", "", "IP address and port of database (e.g., localhost:8086)") + + config = flag.String("config", "", "The stress test file") + cpuprofile = flag.String("cpuprofile", "", "Write the cpu profile to `filename`") ) func main() { - var cfg *runner.Config - var err error - runtime.GOMAXPROCS(runtime.NumCPU()) flag.Parse() - if *test == "" { - fmt.Println("'-test' flag is required") - return + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + fmt.Println(err) + return + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() } - cfg, err = runner.DecodeFile(*test) + c, err := stress.NewConfig(*config) if err != nil { fmt.Println(err) return } - if *batchSize != 0 { - cfg.Write.BatchSize = *batchSize - } - - if *concurrency != 0 { - cfg.Write.Concurrency = *concurrency - } - - if *batchInterval != 0*time.Second { - cfg.Write.BatchInterval = batchInterval.String() - } - - if *database != "" { - cfg.Write.Database = *database - } - - if *address != "" { - cfg.Write.Address = *address - } - - if *precision != "" { - cfg.Write.Precision = *precision - } - - d := make(chan struct{}) - seriesQueryResults := make(chan runner.QueryResults) - - if cfg.SeriesQuery.Enabled { - go runner.SeriesQuery(cfg, d, seriesQueryResults) - } - - measurementQueryResults := make(chan runner.QueryResults) - - ts := make(chan time.Time) - if cfg.MeasurementQuery.Enabled { - go runner.MeasurementQuery(cfg, ts, measurementQueryResults) - } - - // Get the stress results - totalPoints, failedRequests, responseTimes, timer := runner.Run(cfg, d, ts) - - sort.Sort(sort.Reverse(sort.Interface(responseTimes))) - - total := int64(0) - for _, t := range responseTimes { - total += int64(t.Value) - } - mean := total / int64(len(responseTimes)) - - fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/timer.Elapsed().Seconds()) - fmt.Printf("%d requests failed for %d total points that didn't get posted.\n", failedRequests, failedRequests**batchSize) - fmt.Println("Average response time: ", time.Duration(mean)) - fmt.Println("Slowest response times:") - for _, r := range responseTimes[:100] { - fmt.Println(time.Duration(r.Value)) - } - - // Get series query results - if cfg.SeriesQuery.Enabled { - qrs := <-seriesQueryResults - - queryTotal := int64(0) - for _, qt := range qrs.ResponseTimes { - queryTotal += int64(qt.Value) - } - seriesQueryMean := queryTotal / int64(len(qrs.ResponseTimes)) - - fmt.Printf("Queried Series %d times with a average response time of %v milliseconds\n", qrs.TotalQueries, time.Duration(seriesQueryMean).Seconds()*1000) - - } - - // Get measurement query results - if cfg.MeasurementQuery.Enabled { - qrs := <-measurementQueryResults - - queryTotal := int64(0) - for _, qt := range qrs.ResponseTimes { - queryTotal += int64(qt.Value) - } - seriesQueryMean := queryTotal / int64(len(qrs.ResponseTimes)) - - fmt.Printf("Queried Measurement %d times with a average response time of %v milliseconds\n", qrs.TotalQueries, time.Duration(seriesQueryMean).Seconds()*1000) - - } + stress.Run(c) return diff --git a/stress/README.md b/stress/README.md new file mode 100644 index 0000000000..09c7bec91a --- /dev/null +++ b/stress/README.md @@ -0,0 +1,47 @@ +## Stress Test +The logic for `StressTest` can be found in `stress/run.go`. + +A new `StressTest` type was added and is composed four different parts. The `StressTest` type has one method `Start(wHandle responseHandler, rHandle responseHandler)`. This method starts the stress test. + +A `responseHandler` is a function with type signature `func(r <-chan response, t *Timer)`. Response Handlers handle the read and write responses respectively. + +### Provisioner +Provisions the InfluxDB instance where the stress test is going to be ran against. + +Think things like, creating the database, setting up retention policies, continuous queries, etc. + +### Writer +The `Writer` is responsible for Writing data into an InfluxDB instance. It has two components: `PointGenerator` and `InfluxClient`. + +##### PointGenerator +The `PointGenerator` is responsible for generating points that will be written into InfluxDB. Additionally, it is reponsible for keeping track of the latest timestamp of the points it is writing (Just incase the its needed by the `Reader`). + +Any type that implements the methods `Generate()` and `Time()` is a `PointGenerator`. + +##### InfluxClient +The `InfluxClient` is responsible for writing the data that is generated by the `PointGenerator`. + +Any type that implements `Batch(ps <-chan Point, r chan<- response)`, and `send(b []byte) response` is an `InfluxClient`. + +### Reader +The `Reader` is responsible for querying the database. It has two components: `QueryGenerator` and `QueryClient`. + +##### QueryGenerator +The `QueryGenerator` is responsible for generating queries. + +##### QueryClient +The `QueryClient` is responsible for executing queries against an InfluxDB instance. + +## Basic +`basic.go` implements an each of the components of a stress test. + +## Util +`util.go` contains utility methods used throughout the package. + +## Config +`config.go` contains the logic for managing the configuration of the stress test. + +A sample configuration file can be found in `stress/stress.toml`. This still needs work, but whats there now is good enough IMO. + +## Template +`template.go` contains the logic for a basic stress test. diff --git a/stress/basic.go b/stress/basic.go new file mode 100644 index 0000000000..d02712c788 --- /dev/null +++ b/stress/basic.go @@ -0,0 +1,523 @@ +package stress + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "sync" + "time" + + "github.com/influxdb/influxdb/client/v2" +) + +// AbstractTag is a struct that abstractly +// defines a tag +type AbstractTag struct { + Key string `toml:"key"` + Value string `toml:"value"` +} + +// AbstractTags is a slice of abstract tags +type AbstractTags []AbstractTag + +// Template returns a templated string of tags +func (t AbstractTags) Template() string { + var buf bytes.Buffer + for i, tag := range t { + if i == 0 { + buf.Write([]byte(fmt.Sprintf("%v=%v-%%v,", tag.Key, tag.Value))) + } else { + buf.Write([]byte(fmt.Sprintf("%v=%v,", tag.Key, tag.Value))) + } + } + + b := buf.Bytes() + b = b[0 : len(b)-1] + + return string(b) +} + +// AbstractField is a struct that abstractly +// defines a field +type AbstractField struct { + Key string `toml:"key"` + Type string `toml:"type"` +} + +// AbstractFields is a slice of abstract fields +type AbstractFields []AbstractField + +// Template returns a templated string of fields +func (f AbstractFields) Template() (string, []string) { + var buf bytes.Buffer + a := make([]string, len(f)) + for i, field := range f { + buf.Write([]byte(fmt.Sprintf("%v=%%v,", field.Key))) + a[i] = field.Type + } + + b := buf.Bytes() + b = b[0 : len(b)-1] + + return string(b), a +} + +// BasicPointGenerator implements the PointGenerator interface +type BasicPointGenerator struct { + Enabled bool `toml:"enabled"` + PointCount int `toml:"point_count"` + Tick string `toml:"tick"` + Jitter bool `toml:"jitter"` + Measurement string `toml:"measurement"` + SeriesCount int `toml:"series_count"` + Tags AbstractTags `toml:"tag"` + Fields AbstractFields `toml:"field"` + StartDate string `toml:"start_date"` + time time.Time + mu sync.Mutex +} + +// typeArr accepts a string array of types and +// returns an array of equal length where each +// element of the array is an instance of the type +// expressed in the string array. +func typeArr(a []string) []interface{} { + i := make([]interface{}, len(a)) + for j, ty := range a { + var t string + switch ty { + case "float64": + t = fmt.Sprintf("%v", rand.Intn(1000)) + case "int": + t = fmt.Sprintf("%vi", rand.Intn(1000)) + case "bool": + b := rand.Intn(2) == 1 + t = fmt.Sprintf("%t", b) + default: + t = fmt.Sprintf("%v", rand.Intn(1000)) + } + i[j] = t + } + + return i +} + +// Template returns a function that returns a pointer to a Pnt. +func (b *BasicPointGenerator) Template() func(i int, t time.Time) *Pnt { + ts := b.Tags.Template() + fs, fa := b.Fields.Template() + tmplt := fmt.Sprintf("%v,%v %v %%v", b.Measurement, ts, fs) + + return func(i int, t time.Time) *Pnt { + p := &Pnt{} + arr := []interface{}{i} + arr = append(arr, typeArr(fa)...) + arr = append(arr, t.UnixNano()) + + str := fmt.Sprintf(tmplt, arr...) + p.Set([]byte(str)) + return p + } +} + +// Pnt is a struct that implements the Point interface. +type Pnt struct { + line []byte +} + +// Set sets the internal state for a Pnt. +func (p *Pnt) Set(b []byte) { + p.line = b +} + +// Next generates very simple points very +// efficiently. +// TODO: Take this out +func (p *Pnt) Next(i int, t time.Time) { + p.line = []byte(fmt.Sprintf("a,b=c-%v v=%v", i, i)) +} + +// Line returns a byte array for a point +// in line protocol format. +func (p Pnt) Line() []byte { + return p.line +} + +// Graphite returns a byte array for a point +// in graphite format. +func (p Pnt) Graphite() []byte { + // TODO: Implement + return []byte("") +} + +// OpenJSON returns a byte array for a point +// in opentsdb json format +func (p Pnt) OpenJSON() []byte { + // TODO: Implement + return []byte("") +} + +// OpenTelnet returns a byte array for a point +// in opentsdb-telnet format +func (p Pnt) OpenTelnet() []byte { + // TODO: Implement + return []byte("") +} + +// Generate returns a point channel. Implements the +// Generate method for the PointGenerator interface +func (b *BasicPointGenerator) Generate() (<-chan Point, error) { + // TODO: should be 1.5x batch size + c := make(chan Point, 15000) + tmplt := b.Template() + + go func(c chan Point) { + defer close(c) + + start, err := time.Parse("2006-Jan-02", b.StartDate) + if err != nil { + fmt.Println(err) + return + } + + b.mu.Lock() + b.time = start + b.mu.Unlock() + + tick, err := time.ParseDuration(b.Tick) + if err != nil { + fmt.Println(err) + return + } + + for i := 0; i < b.PointCount; i++ { + b.mu.Lock() + b.time = b.time.Add(tick) + b.mu.Unlock() + + for j := 0; j < b.SeriesCount; j++ { + p := tmplt(j, b.time) + + c <- *p + } + } + }(c) + + return c, nil +} + +// Time returns the timestamp for the latest points +// that are being generated. Implements the Time method +// for the PointGenerator interface. +func (b *BasicPointGenerator) Time() time.Time { + defer b.mu.Unlock() + b.mu.Lock() + t := b.time + return t +} + +// BasicClient implements the InfluxClient +// interface. +type BasicClient struct { + Enabled bool `toml:"enabled"` + Address string `toml:"address"` + Database string `toml:"database"` + Precision string `toml:"precision"` + BatchSize int `toml:"batch_size"` + BatchInterval string `toml:"batch_interval"` + Concurrency int `toml:"concurrency"` + SSL bool `toml:"ssl"` + Format string `toml:"format"` +} + +// Batch groups together points +func (c *BasicClient) Batch(ps <-chan Point, r chan<- response) error { + var buf bytes.Buffer + var wg sync.WaitGroup + counter := NewConcurrencyLimiter(c.Concurrency) + + interval, err := time.ParseDuration(c.BatchInterval) + if err != nil { + return err + } + + ctr := 0 + + for p := range ps { + b := p.Line() + ctr++ + + buf.Write(b) + buf.Write([]byte("\n")) + + if ctr%c.BatchSize == 0 && ctr != 0 { + b := buf.Bytes() + + // Trimming the trailing newline character + b = b[0 : len(b)-1] + + wg.Add(1) + counter.Increment() + go func(byt []byte) { + defer wg.Done() + + rs, err := c.send(byt) + if err != nil { + fmt.Println(err) + } + time.Sleep(interval) + + counter.Decrement() + r <- rs + }(b) + + var temp bytes.Buffer + buf = temp + } + + } + + wg.Wait() + + return nil +} + +// post sends a post request with a payload of points +func post(url string, datatype string, data io.Reader) (*http.Response, error) { + resp, err := http.Post(url, datatype, data) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf(string(body)) + } + + return resp, nil +} + +// Send calls post and returns a response +func (c *BasicClient) send(b []byte) (response, error) { + instanceURL := fmt.Sprintf("http://%v/write?db=%v&precision=%v", c.Address, c.Database, c.Precision) + + t := NewTimer() + resp, err := post(instanceURL, "application/x-www-form-urlencoded", bytes.NewBuffer(b)) + t.StopTimer() + if err != nil { + return response{Timer: t}, err + } + + r := response{ + Resp: resp, + Time: time.Now(), + Timer: t, + } + + return r, nil +} + +// BasicQuery implements the QueryGenerator interface +type BasicQuery struct { + Template Query `toml:"template"` + QueryCount int `toml:"query_count"` + time time.Time +} + +// QueryGenerate returns a Query channel +func (q *BasicQuery) QueryGenerate(now func() time.Time) (<-chan Query, error) { + c := make(chan Query, 0) + + go func(chan Query) { + defer close(c) + + for i := 0; i < q.QueryCount; i++ { + c <- Query(fmt.Sprintf(string(q.Template), i)) + } + + }(c) + + return c, nil +} + +// SetTime sets the internal state of time +func (q *BasicQuery) SetTime(t time.Time) { + q.time = t + return +} + +// BasicQueryClient implements the QueryClient interface +type BasicQueryClient struct { + Address string `toml:"address"` + Database string `toml:"database"` + QueryInterval string `toml:"query_interval"` + Concurrency int `toml:"concurrency"` + client client.Client +} + +// Init initializes the InfluxDB client +func (b *BasicQueryClient) Init() error { + u, err := url.Parse(fmt.Sprintf("http://%v", b.Address)) + if err != nil { + return err + } + cl := client.NewClient(client.Config{ + URL: u, + }) + + b.client = cl + + return nil +} + +// Query runs the query +func (b *BasicQueryClient) Query(cmd Query) (response, error) { + q := client.Query{ + Command: string(cmd), + Database: b.Database, + } + + t := NewTimer() + _, err := b.client.Query(q) + t.StopTimer() + + if err != nil { + return response{Timer: t}, err + } + + // Needs actual response type + r := response{ + Time: time.Now(), + Timer: t, + } + + return r, nil + +} + +// Exec listens to the query channel an executes queries as they come in +func (b *BasicQueryClient) Exec(qs <-chan Query, r chan<- response) error { + var wg sync.WaitGroup + counter := NewConcurrencyLimiter(b.Concurrency) + + b.Init() + + interval, err := time.ParseDuration(b.QueryInterval) + if err != nil { + return err + } + + for q := range qs { + wg.Add(1) + counter.Increment() + func(q Query) { + defer wg.Done() + qr, _ := b.Query(q) + r <- qr + time.Sleep(interval) + counter.Decrement() + }(q) + } + + wg.Wait() + + return nil +} + +// resetDB will drop an create a new database on an existing +// InfluxDB instance. +func resetDB(c client.Client, database string) error { + _, err := c.Query(client.Query{ + // Change to DROP DATABASE %s IF EXISTS + Command: fmt.Sprintf("DROP DATABASE %s", database), + }) + if err != nil { + return err + } + + _, err = c.Query(client.Query{ + Command: fmt.Sprintf("CREATE DATABASE %s", database), + }) + if err != nil { + return err + } + + return nil +} + +// BasicProvisioner implements the Provisioner +// interface. +type BasicProvisioner struct { + Enabled bool `toml:"enabled"` + Address string `toml:"address"` + Database string `toml:"database"` + ResetDatabase bool `toml:"reset_database"` +} + +// Provision runs the resetDB function. +func (b *BasicProvisioner) Provision() error { + u, err := url.Parse(fmt.Sprintf("http://%v", b.Address)) + if err != nil { + return err + } + cl := client.NewClient(client.Config{ + URL: u, + }) + + if b.ResetDatabase { + resetDB(cl, b.Database) + } + + return nil +} + +// BasicWriteHandler handles write responses. +func BasicWriteHandler(rs <-chan response, wt *Timer) { + n := 0 + success := 0 + fail := 0 + + s := time.Duration(0) + + for t := range rs { + + n++ + + if t.Success() { + success++ + } else { + fail++ + } + + s += t.Timer.Elapsed() + + } + + fmt.Printf("Total Requests: %v\n", n) + fmt.Printf(" Success: %v\n", success) + fmt.Printf(" Fail: %v\n", fail) + fmt.Printf("Average Response Time: %v\n", s/time.Duration(n)) + fmt.Printf("Points Per Second: %v\n\n", float64(n)*float64(10000)/float64(wt.Elapsed().Seconds())) +} + +// BasicReadHandler handles read responses. +func BasicReadHandler(r <-chan response, rt *Timer) { + n := 0 + s := time.Duration(0) + for t := range r { + n++ + s += t.Timer.Elapsed() + } + + fmt.Printf("Total Queries: %v\n", n) + fmt.Printf("Average Query Response Time: %v\n\n", s/time.Duration(n)) +} diff --git a/stress/config.go b/stress/config.go index 8012f2c391..fcf55b2543 100644 --- a/stress/config.go +++ b/stress/config.go @@ -1,137 +1,72 @@ -package runner +package stress import ( - "bytes" - "fmt" - "math/rand" - "time" - "github.com/BurntSushi/toml" ) -// tag is a struct that contains data -// about a tag for in a series -type tag struct { - Key string `toml:"key"` - Value string `toml:"value"` -} - -// tag is a struct that contains data -// about a field for in a series -type field struct { - Key string `toml:"key"` - Type string `toml:"type"` -} - -// series is a struct that contains data -// about the series that will be written -// during a stress test -type series struct { - PointCount int `toml:"point_count"` - Tick string `toml:"tick"` - Jitter bool `toml:"jitter"` - Measurement string `toml:"measurement"` - SeriesCount int `toml:"series_count"` - TagCount int `toml:"tag_count"` - Tags []tag `toml:"tag"` - Fields []field `toml:"field"` -} - -// write is a struct that contains the business -// logic for the stress test. e.g. where the -// influxdb instance is running, what database -// should points be written into -type write struct { - Concurrency int `toml:"concurrency"` - BatchSize int `toml:"batch_size"` - BatchInterval string `toml:"batch_interval"` - Database string `toml:"database"` - ResetDatabase bool `toml:"reset_database"` - Address string `toml:"address"` - Precision string `toml:"precision"` - StartDate string `toml:"start_date"` -} - -// query is a struct that contains the logic for -// a query that will be ran on during the stress -// test -type query struct { - Enabled bool `toml:"enabled"` - Concurrency int `toml:"concurrency"` - Aggregates []string `toml:"aggregates"` - Fields []string `toml:"fields"` -} - -// measurementQuery is a struct that contains -// the logic that runs a query against a measurement -// over a time period that is specified by -// `Offset` -type measurementQuery struct { - query - Offset string `toml:"offset"` -} - -// seriesQuery is a struct that contains -// the logic that runs a query against a single -// series -type seriesQuery struct { - query - Interval string `toml:"interval"` -} - -// Config is a struct that is passed into the `Run()` function. +// Config is a struct for the Stress test configuration type Config struct { - Write write `toml:"write"` - Series []series `toml:"series"` - MeasurementQuery measurementQuery `toml:"measurement_query"` - SeriesQuery seriesQuery `toml:"series_query"` - ChannelBufferSize int `toml:"channel_buffer_size"` - SSL bool `toml:"ssl"` + Provision Provision `toml:"provision"` + Write Write `toml:"write"` + Read Read `toml:"read"` } -// NewSeries takes a measurement, and point count, -// and a series count and returns a series -func NewSeries(m string, p int, sc int) series { - s := series{ - PointCount: p, - SeriesCount: sc, - Tick: "1s", - Measurement: m, - Tags: []tag{ - tag{ - Key: "host", - Value: "server", - }, - }, - Fields: []field{ - field{ - Key: "value", - }, - }, - } - - return s +// Provision is a struct that contains the configuration +// parameters for all implemented Provisioner's. +type Provision struct { + Basic BasicProvisioner `toml:"basic"` } -// NewConfig returns a pointer to a config -// with some default parameters set -func NewConfig() *Config { +// Write is a struct that contains the configuration +// parameters for the stress test Writer. +type Write struct { + PointGenerators PointGenerators `toml:"point_generator"` + InfluxClients InfluxClients `toml:"influx_client"` +} - w := write{ - Concurrency: 10, - BatchSize: 5000, - BatchInterval: "0s", - Database: "stress", - ResetDatabase: true, - Address: "localhost:8086", - Precision: "n", +// PointGenerators is a struct that contains the configuration +// parameters for all implemented PointGenerator's. +type PointGenerators struct { + Basic BasicPointGenerator `toml:"basic"` +} + +// InfluxClients is a struct that contains the configuration +// parameters for all implemented InfluxClient's. +type InfluxClients struct { + Basic BasicClient `toml:"basic"` +} + +// Read is a struct that contains the configuration +// parameters for the stress test Reader. +type Read struct { + QueryGenerators QueryGenerators `toml:"query_generator"` + QueryClients QueryClients `toml:"query_client"` +} + +// QueryGenerators is a struct that contains the configuration +// parameters for all implemented QueryGenerator's. +type QueryGenerators struct { + Basic BasicQuery `toml:"basic"` +} + +// QueryClients is a struct that contains the configuration +// parameters for all implemented QueryClient's. +type QueryClients struct { + Basic BasicQueryClient `toml:"basic"` +} + +// NewConfig returns a pointer to a Config +func NewConfig(s string) (*Config, error) { + var c *Config + var err error + + if s == "" { + c, err = BasicStress() + } else { + c, err = DecodeFile(s) } - c := &Config{ - Write: w, - } - - return c + return c, err } // DecodeFile takes a file path for a toml config file @@ -144,138 +79,18 @@ func DecodeFile(s string) (*Config, error) { return nil, err } - // Initialize Config struct - // NOTE: Not happy with the implementation - // but it will do for now - for j, srs := range t.Series { - for i := 0; i < srs.TagCount; i++ { + return t, nil +} - tag := tag{ - Key: fmt.Sprintf("tag-key-%d", i), - Value: "tag-value", - } +// DecodeConfig takes a file path for a toml config file +// and returns a pointer to a Config Struct. +func DecodeConfig(s string) (*Config, error) { + t := &Config{} - srs.Tags = append(srs.Tags, tag) - fmt.Println(srs) - } - - t.Series[j] = srs + // Decode the toml file + if _, err := toml.Decode(s, t); err != nil { + return nil, err } return t, nil } - -// seriesIter is a struct that contains a -// series and a count, where count is the -//number of points that have been written -// for the series `s` -type seriesIter struct { - s *series - count int - timestamp time.Time - precision string -} - -// writeInterval returns a timestamp for the current time -// interval -func (s *series) writeInterval(i int, start time.Time) time.Time { - var tick time.Duration - var j int - var err error - - tick, err = time.ParseDuration(s.Tick) - if err != nil { - panic(err) - } - - if s.Jitter { - j = rand.Intn(int(tick)) - if j%2 == 0 { - j = -2 * j - } - } - - tick = tick*time.Duration(i) + time.Duration(j) - - return start.Add(tick) -} - -// Iter returns a pointer to a seriesIter -func (s *series) Iter(i int, start time.Time, p string) *seriesIter { - - return &seriesIter{s: s, count: -1, timestamp: s.writeInterval(i, start), precision: p} -} - -// Next returns a new point for a series. -// Currently, there is an off by one bug here. -func (iter *seriesIter) Next() ([]byte, bool) { - var buf bytes.Buffer - iter.count++ - - buf.Write([]byte(fmt.Sprintf("%v,", iter.s.Measurement))) - buf.Write(iter.s.newTagSet(iter.count)) - buf.Write([]byte(" ")) - buf.Write(iter.s.newFieldSet(iter.count)) - buf.Write([]byte(" ")) - - switch iter.precision { - case "s": - buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.Unix()))) - default: - buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.UnixNano()))) - } - - b := iter.count < iter.s.SeriesCount - byt := buf.Bytes() - - return byt, b -} - -// newTagSet returns a byte array representation -// of the tagset for a series -func (s *series) newTagSet(c int) []byte { - var buf bytes.Buffer - for _, tag := range s.Tags { - buf.Write([]byte(fmt.Sprintf("%v=%v-%v,", tag.Key, tag.Value, c))) - } - - b := buf.Bytes() - b = b[0 : len(b)-1] - - return b -} - -// newFieldSet returns a byte array representation -// of the field-set for a series -func (s *series) newFieldSet(c int) []byte { - var buf bytes.Buffer - - for _, field := range s.Fields { - switch field.Type { - case "float64-flat": - if rand.Intn(10) > 2 { - buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, 100))) - } else { - buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, 100+rand.Intn(100)))) - } - case "float64-inc+": - buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, c+rand.Intn(2)))) - case "float64-inc": - buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, c))) - case "float64": - buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, rand.Intn(1000)))) - case "int": - buf.Write([]byte(fmt.Sprintf("%v=%vi,", field.Key, rand.Intn(1000)))) - case "bool": - b := rand.Intn(2) == 1 - buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, b))) - default: - buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, rand.Intn(1000)))) - } - } - - b := buf.Bytes() - b = b[0 : len(b)-1] - - return b -} diff --git a/stress/query.go b/stress/query.go deleted file mode 100644 index 832d4d2976..0000000000 --- a/stress/query.go +++ /dev/null @@ -1,138 +0,0 @@ -package runner - -import ( - "fmt" - "sync" - "time" - - "github.com/influxdb/influxdb/client" -) - -// QueryResults holds the total number of executed queries -// and the response time for each query -type QueryResults struct { - TotalQueries int - ResponseTimes ResponseTimes -} - -func SeriesQuery(cfg *Config, done chan struct{}, results chan QueryResults) { - - var mu sync.Mutex - var wg sync.WaitGroup - responseTimes := make(ResponseTimes, 0) - totalQueries := 0 - - c, err := cfg.NewClient() - if err != nil { - panic(err) - } - - counter := NewConcurrencyLimiter(cfg.SeriesQuery.Concurrency) - for _, s := range cfg.Series { - for i := 0; i < s.SeriesCount; i++ { - for _, a := range cfg.SeriesQuery.Aggregates { - for _, f := range cfg.SeriesQuery.Fields { - q := fmt.Sprintf("SELECT %v(%v) FROM %v WHERE %v='%v-%v'", a, f, s.Measurement, s.Tags[0].Key, s.Tags[0].Value, i) - - wg.Add(1) - counter.Increment() - totalQueries += 1 - - go func(q string) { - - select { - case <-done: - results <- QueryResults{ - TotalQueries: totalQueries, - ResponseTimes: responseTimes, - } - default: - st := time.Now() - _, err := c.Query(client.Query{ - Command: q, - Database: cfg.Write.Database, - }) - if err != nil { - fmt.Println("Error") - } - mu.Lock() - totalQueries += 1 - responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds()))) - mu.Unlock() - wg.Done() - t, err := time.ParseDuration(cfg.SeriesQuery.Interval) - time.Sleep(t) - } - - counter.Decrement() - - }(q) - - } - } - } - } - - wg.Wait() - - return - -} - -func MeasurementQuery(cfg *Config, timestamp chan time.Time, results chan QueryResults) { - - var wg sync.WaitGroup - responseTimes := make(ResponseTimes, 0) - totalQueries := 0 - - c, err := cfg.NewClient() - if err != nil { - panic(err) - } - - for _, s := range cfg.Series { - for _, a := range cfg.MeasurementQuery.Aggregates { - for _, f := range cfg.MeasurementQuery.Fields { - - wg.Add(1) - - go func(a string, f string, m string) { - - tsp := <-timestamp - - offset, err := time.ParseDuration(cfg.MeasurementQuery.Offset) - if err != nil { - panic(err) - } - - q := fmt.Sprintf("SELECT %v(%v) FROM %v WHERE time>=%vs", a, f, m, tsp.Add(-1*offset).Unix()) - st := time.Now() - - _, err = c.Query(client.Query{ - Command: q, - Database: cfg.Write.Database, - }) - - if err != nil { - fmt.Println("Error") - } - - totalQueries += 1 - responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds()))) - wg.Done() - - }(a, f, s.Measurement) - - } - } - } - - wg.Wait() - results <- QueryResults{ - TotalQueries: totalQueries, - ResponseTimes: responseTimes, - } - - return - -} diff --git a/stress/run.go b/stress/run.go new file mode 100644 index 0000000000..c0c877ae8d --- /dev/null +++ b/stress/run.go @@ -0,0 +1,344 @@ +package stress + +import ( + "bytes" + "fmt" + "net/http" + "sync" + "time" +) + +// Run handles the logic for running a stress test given a config file +func Run(c *Config) { + w := NewWriter(&c.Write.PointGenerators.Basic, &c.Write.InfluxClients.Basic) + r := NewQuerier(&c.Read.QueryGenerators.Basic, &c.Read.QueryClients.Basic) + s := NewStressTest(&c.Provision.Basic, w, r) + + s.Start(BasicWriteHandler, BasicReadHandler) +} + +// Point is an interface that is used to represent +// the abstract idea of a point in InfluxDB. +type Point interface { + Line() []byte + Graphite() []byte + OpenJSON() []byte + OpenTelnet() []byte +} + +/////////////////////////////////////////////////// +// Example Implementation of the Point Interface // +/////////////////////////////////////////////////// + +// KeyValue is an intermediate type that is used +// to express Tag and Field similarly. +type KeyValue struct { + Key string + Value string +} + +// Tag is a struct for a tag in influxdb. +type Tag KeyValue + +// Field is a struct for a field in influxdb. +type Field KeyValue + +// Tags is an slice of all the tags for a point. +type Tags []Tag + +// Fields is an slice of all the fields for a point. +type Fields []Field + +// tagset returns a byte array for a points tagset. +func (t Tags) tagset() []byte { + var buf bytes.Buffer + for _, tag := range t { + buf.Write([]byte(fmt.Sprintf("%v=%v,", tag.Key, tag.Value))) + } + + b := buf.Bytes() + b = b[0 : len(b)-1] + + return b +} + +// fieldset returns a byte array for a points fieldset. +func (f Fields) fieldset() []byte { + var buf bytes.Buffer + for _, field := range f { + buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, field.Value))) + } + + b := buf.Bytes() + b = b[0 : len(b)-1] + + return b +} + +// StdPoint represents a point in InfluxDB +type StdPoint struct { + Measurement string + Tags Tags + Fields Fields + Timestamp int64 +} + +// Line returns a byte array for a point in +// line-protocol format +func (p StdPoint) Line() []byte { + var buf bytes.Buffer + + buf.Write([]byte(fmt.Sprintf("%v,", p.Measurement))) + buf.Write(p.Tags.tagset()) + buf.Write([]byte(" ")) + buf.Write(p.Fields.fieldset()) + buf.Write([]byte(" ")) + buf.Write([]byte(fmt.Sprintf("%v", p.Timestamp))) + + byt := buf.Bytes() + + return byt +} + +// Graphite returns a byte array for a point +// in graphite-protocol format +func (p StdPoint) Graphite() []byte { + // TODO: implement + // timestamp is at second level resolution + // but can be specified as a float to get nanosecond + // level precision + t := "tag_1.tag_2.measurement[.field] acutal_value timestamp" + return []byte(t) +} + +// OpenJSON returns a byte array for a point +// in JSON format +func (p StdPoint) OpenJSON() []byte { + // TODO: implement + //[ + // { + // "metric": "sys.cpu.nice", + // "timestamp": 1346846400, + // "value": 18, + // "tags": { + // "host": "web01", + // "dc": "lga" + // } + // }, + // { + // "metric": "sys.cpu.nice", + // "timestamp": 1346846400, + // "value": 9, + // "tags": { + // "host": "web02", + // "dc": "lga" + // } + // } + //] + return []byte("hello") +} + +// OpenTelnet returns a byte array for a point +// in OpenTSDB-telnet format +func (p StdPoint) OpenTelnet() []byte { + // TODO: implement + // timestamp can be 13 digits at most + // sys.cpu.nice timestamp value tag_key_1=tag_value_1 tag_key_2=tag_value_2 + return []byte("hello") +} + +//////////////////////////////////////// + +// response is the results making +// a request to influxdb. +type response struct { + Resp *http.Response + Time time.Time + Timer *Timer +} + +// Success returns true if the request +// was successful and false otherwise. +func (r response) Success() bool { + // ADD success for tcp, udp, etc + return !(r.Resp == nil || r.Resp.StatusCode != 204) +} + +// WriteResponse is a response for a Writer +type WriteResponse response + +// QueryResponse is a response for a Querier +type QueryResponse struct { + response + Body string +} + +/////////////////////////////// +// Definition of the Writer /// +/////////////////////////////// + +// PointGenerator is an interface for generating points. +type PointGenerator interface { + Generate() (<-chan Point, error) + Time() time.Time +} + +// InfluxClient is an interface for writing data to the database. +type InfluxClient interface { + Batch(ps <-chan Point, r chan<- response) error + send(b []byte) (response, error) + //ResponseHandler +} + +// Writer is a PointGenerator and an InfluxClient. +type Writer struct { + PointGenerator + InfluxClient +} + +// NewWriter returns a Writer. +func NewWriter(p PointGenerator, i InfluxClient) Writer { + w := Writer{ + PointGenerator: p, + InfluxClient: i, + } + + return w +} + +//////////////////////////////// +// Definition of the Querier /// +//////////////////////////////// + +// Query is query +type Query string + +// QueryGenerator is an interface that is used +// to define queries that will be ran on the DB. +type QueryGenerator interface { + QueryGenerate(f func() time.Time) (<-chan Query, error) + SetTime(t time.Time) +} + +// QueryClient is an interface that can write a query +// to an InfluxDB instance. +type QueryClient interface { + Query(q Query) (response, error) + Exec(qs <-chan Query, r chan<- response) error +} + +// Querier queries the database. +type Querier struct { + QueryGenerator + QueryClient +} + +// NewQuerier returns a Querier. +func NewQuerier(q QueryGenerator, c QueryClient) Querier { + r := Querier{ + QueryGenerator: q, + QueryClient: c, + } + + return r +} + +/////////////////////////////////// +// Definition of the Provisioner // +/////////////////////////////////// + +// Provisioner is an interface that provisions an +// InfluxDB instance +type Provisioner interface { + Provision() error +} + +///////////////////////////////// +// Definition of StressTest ///// +///////////////////////////////// + +// StressTest is a struct that contains all of +// the logic required to execute a Stress Test +type StressTest struct { + Provisioner + Writer + Querier +} + +// responseHandler +type responseHandler func(r <-chan response, t *Timer) + +// Start executes the Stress Test +func (s *StressTest) Start(wHandle responseHandler, rHandle responseHandler) { + var wg sync.WaitGroup + + // Provision the Instance + s.Provision() + + wg.Add(1) + // Starts Writing + go func() { + r := make(chan response, 0) + wt := NewTimer() + + go func() { + defer wt.StopTimer() + defer close(r) + p, err := s.Generate() + if err != nil { + fmt.Println(err) + return + } + + err = s.Batch(p, r) + if err != nil { + fmt.Println(err) + return + } + }() + + // Write Results Handler + wHandle(r, wt) + wg.Done() + }() + + wg.Add(1) + // Starts Querying + go func() { + r := make(chan response, 0) + rt := NewTimer() + + go func() { + defer rt.StopTimer() + defer close(r) + q, err := s.QueryGenerate(s.Time) + if err != nil { + fmt.Println(err) + return + } + + err = s.Exec(q, r) + if err != nil { + fmt.Println(err) + return + } + }() + + // Read Results Handler + rHandle(r, rt) + wg.Done() + }() + + wg.Wait() +} + +// NewStressTest returns an instance of a StressTest +func NewStressTest(p Provisioner, w Writer, r Querier) StressTest { + s := StressTest{ + Provisioner: p, + Writer: w, + Querier: r, + } + + return s +} diff --git a/stress/runner.go b/stress/runner.go deleted file mode 100644 index 1c758a065c..0000000000 --- a/stress/runner.go +++ /dev/null @@ -1,360 +0,0 @@ -package runner - -import ( - "bytes" - "fmt" - "io" - "io/ioutil" - "net/http" - "strings" - "sync" - "time" - - "github.com/influxdb/influxdb/client" -) - -func post(url string, datatype string, data io.Reader) error { - - resp, err := http.Post(url, datatype, data) - if err != nil { - return err - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - - if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { - return fmt.Errorf(string(body)) - } - - return nil -} - -// Timer is struct that can be used to track elaspsed time -type Timer struct { - start time.Time - end time.Time -} - -// Start returns a Timers start field -func (t *Timer) Start() time.Time { - return t.start -} - -// End returns a Timers end field -func (t *Timer) End() time.Time { - return t.end -} - -// StartTimer sets a timers `start` field to the current time -func (t *Timer) StartTimer() { - t.start = time.Now() -} - -// StopTimer sets a timers `end` field to the current time -func (t *Timer) StopTimer() { - t.end = time.Now() -} - -// Elapsed returns the total elapsed time between the `start` -// and `end` fields on a timer. -func (t *Timer) Elapsed() time.Duration { - return t.end.Sub(t.start) -} - -// NewTimer returns a pointer to a `Timer` struct where the -// timers `start` field has been set to `time.Now()` -func NewTimer() *Timer { - t := &Timer{} - t.StartTimer() - return t -} - -// ResponseTime is a struct that contains `Value` -// `Time` pairing. -type ResponseTime struct { - Value int - Time time.Time -} - -// NewResponseTime returns a new response time -// with value `v` and time `time.Now()`. -func NewResponseTime(v int) ResponseTime { - r := ResponseTime{Value: v, Time: time.Now()} - return r -} - -// ResponseTimes is a slice of response times -type ResponseTimes []ResponseTime - -// Implements the `Len` method for the -// sort.Interface type -func (rs ResponseTimes) Len() int { - return len(rs) -} - -// Implements the `Less` method for the -// sort.Interface type -func (rs ResponseTimes) Less(i, j int) bool { - return rs[i].Value < rs[j].Value -} - -// Implements the `Swap` method for the -// sort.Interface type -func (rs ResponseTimes) Swap(i, j int) { - rs[i], rs[j] = rs[j], rs[i] -} - -// Measurements holds all measurement results of the stress test -type Measurements []string - -// String returns a string and implements the `String` method for -// the flag.Value interface. -func (ms *Measurements) String() string { - return fmt.Sprint(*ms) -} - -// Set implements the `Set` method for the flag.Value -// interface. Set splits a string of comma separated values -// into a `Measurement`. -func (ms *Measurements) Set(value string) error { - values := strings.Split(value, ",") - for _, m := range values { - *ms = append(*ms, m) - } - return nil -} - -// NewClient returns a pointer to an InfluxDB client for -// a `Config`'s `Address` field. If an error is encountered -// when creating a new client, the function panics. -func (cfg *Config) NewClient() (*client.Client, error) { - u, _ := client.ParseConnectionString(cfg.Write.Address, cfg.SSL) - c, err := client.NewClient(client.Config{URL: u}) - if err != nil { - return nil, err - } - return c, nil -} - -func resetDB(c *client.Client, database string) error { - _, err := c.Query(client.Query{ - Command: fmt.Sprintf("DROP DATABASE %s", database), - }) - - if err != nil && !strings.Contains(err.Error(), "database not found") { - return err - } - - return nil -} - -// Run runs the stress test that is specified by a `Config`. -// It returns the total number of points that were during the test, -// an slice of all of the stress tests response times, -// and the times that the test started at and ended as a `Timer` -func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, failedRequests int, responseTimes ResponseTimes, timer *Timer) { - - c, err := cfg.NewClient() - if err != nil { - panic(err) - } - - if cfg.Write.ResetDatabase { - resetDB(c, cfg.Write.Database) - } - - _, err = c.Query(client.Query{ - Command: fmt.Sprintf("CREATE DATABASE %s", cfg.Write.Database), - }) - - if err != nil && !strings.Contains(err.Error(), "database already exists") { - fmt.Println(err) - } - - counter := NewConcurrencyLimiter(cfg.Write.Concurrency) - - var mu sync.Mutex - var wg sync.WaitGroup - responseTimes = make(ResponseTimes, 0) - - failedRequests = 0 - - totalPoints = 0 - - lastSuccess := true - - ch := make(chan []byte, cfg.ChannelBufferSize) - - go func() { - var buf bytes.Buffer - num := 0 - for _, s := range cfg.Series { - num += s.PointCount * s.SeriesCount - - } - - if cfg.MeasurementQuery.Enabled { - num = num / (len(cfg.Series) * len(cfg.MeasurementQuery.Aggregates) * len(cfg.MeasurementQuery.Fields)) - } - - ctr := 0 - - start, err := time.Parse("2006-Jan-02", cfg.Write.StartDate) - if err != nil { - start, err = time.Parse("Jan 2, 2006 at 3:04pm (MST)", cfg.Write.StartDate) - if err != nil { - start = time.Now() - } - } - - for _, testSeries := range cfg.Series { - for i := 0; i < testSeries.PointCount; i++ { - iter := testSeries.Iter(i, start, cfg.Write.Precision) - p, ok := iter.Next() - for ok { - ctr++ - buf.Write(p) - buf.Write([]byte("\n")) - if ctr != 0 && ctr%cfg.Write.BatchSize == 0 { - b := buf.Bytes() - - b = b[0 : len(b)-2] - - ch <- b - var b2 bytes.Buffer - buf = b2 - } - - if cfg.MeasurementQuery.Enabled && ctr%num == 0 { - select { - case ts <- time.Now(): - default: - } - } - - p, ok = iter.Next() - } - } - } - - close(ch) - - }() - - fmt.Println("Filling the Point Channel Buffer...") - fmt.Printf("Test will begin in %v seconds\n", (time.Duration(cfg.ChannelBufferSize/10) * time.Millisecond).Seconds()) - time.Sleep(time.Duration(cfg.ChannelBufferSize/10) * time.Millisecond) - fmt.Println("Starting Stress...") - - timer = NewTimer() - - for pnt := range ch { - - wg.Add(1) - counter.Increment() - totalPoints += cfg.Write.BatchSize - - protocol := "http" - - if cfg.SSL { - protocol = fmt.Sprintf("%vs", protocol) - } - - instanceURL := fmt.Sprintf("%v://%v/write?db=%v&precision=%v", protocol, cfg.Write.Address, cfg.Write.Database, cfg.Write.Precision) - - go func(b *bytes.Buffer, total int) { - st := time.Now() - err := post(instanceURL, "application/x-www-form-urlencoded", b) - if err != nil { // Should retry write if failed - mu.Lock() - if lastSuccess { - fmt.Println("ERROR: ", err.Error()) - } - failedRequests += 1 - totalPoints -= cfg.Write.BatchSize - lastSuccess = false - mu.Unlock() - } else { - mu.Lock() - if !lastSuccess { - fmt.Println("success in ", time.Since(st)) - } - lastSuccess = true - responseTimes = append(responseTimes, NewResponseTime(int(time.Since(st).Nanoseconds()))) - mu.Unlock() - } - batchInterval, _ := time.ParseDuration(cfg.Write.BatchInterval) - time.Sleep(batchInterval) - wg.Done() - counter.Decrement() - if total%500000 == 0 { - fmt.Printf("%d total points. %d in %s\n", total, cfg.Write.BatchSize, time.Since(st)) - } - }(bytes.NewBuffer(pnt), totalPoints) - - } - - wg.Wait() - - timer.StopTimer() - - if cfg.SeriesQuery.Enabled { - done <- struct{}{} - } - - return -} - -// ConcurrencyLimiter is a go routine safe struct that can be used to -// ensure that no more than a specifid max number of goroutines are -// executing. -type ConcurrencyLimiter struct { - inc chan chan struct{} - dec chan struct{} - max int - count int -} - -// NewConcurrencyLimiter returns a configured limiter that will -// ensure that calls to Increment will block if the max is hit. -func NewConcurrencyLimiter(max int) *ConcurrencyLimiter { - c := &ConcurrencyLimiter{ - inc: make(chan chan struct{}), - dec: make(chan struct{}, max), - max: max, - } - go c.handleLimits() - return c -} - -// Increment will increase the count of running goroutines by 1. -// if the number is currently at the max, the call to Increment -// will block until another goroutine decrements. -func (c *ConcurrencyLimiter) Increment() { - r := make(chan struct{}) - c.inc <- r - <-r -} - -// Decrement will reduce the count of running goroutines by 1 -func (c *ConcurrencyLimiter) Decrement() { - c.dec <- struct{}{} -} - -// handleLimits runs in a goroutine to manage the count of -// running goroutines. -func (c *ConcurrencyLimiter) handleLimits() { - for { - r := <-c.inc - if c.count >= c.max { - <-c.dec - c.count-- - } - c.count++ - r <- struct{}{} - } -} diff --git a/stress/runner_test.go b/stress/runner_test.go deleted file mode 100644 index 5fe2fbf015..0000000000 --- a/stress/runner_test.go +++ /dev/null @@ -1,246 +0,0 @@ -package runner - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/influxdb/influxdb/client" -) - -func TestTimer_StartTimer(t *testing.T) { - var epoch time.Time - - tmr := &Timer{} - - tmr.StartTimer() - - s := tmr.Start() - - if s == epoch { - t.Errorf("expected tmr.start to not be %v", s) - } -} - -func TestNewTimer(t *testing.T) { - var epoch time.Time - - tmr := NewTimer() - - s := tmr.Start() - - if s == epoch { - t.Errorf("expected tmr.start to not be %v", s) - } - - e := tmr.End() - - if e != epoch { - t.Errorf("expected tmr.stop to be %v, got %v", epoch, e) - } -} - -func TestTimer_StopTimer(t *testing.T) { - var epoch time.Time - - tmr := NewTimer() - - tmr.StopTimer() - - e := tmr.End() - - if e == epoch { - t.Errorf("expected tmr.stop to not be %v", e) - } -} - -func TestTimer_Elapsed(t *testing.T) { - - tmr := NewTimer() - time.Sleep(2 * time.Second) - tmr.StopTimer() - - e := tmr.Elapsed() - - if time.Duration(2*time.Second) > e || e > time.Duration(3*time.Second) { - t.Errorf("expected around %s got %s", time.Duration(2*time.Second), e) - } - -} - -func TestNewResponseTime(t *testing.T) { - r := NewResponseTime(100) - - if r.Value != 100 { - t.Errorf("expected Value to be %v, got %v", 100, r.Value) - } - - var epoch time.Time - - if r.Time == epoch { - t.Errorf("expected r.Time not to be %v", epoch) - } - -} - -func TestMeasurments_Set(t *testing.T) { - ms := make(Measurements, 0) - - ms.Set("this,is,a,test") - - if ms[0] != "this" { - t.Errorf("expected value to be %v, got %v", "this", ms[0]) - } - - if ms[1] != "is" { - t.Errorf("expected value to be %v, got %v", "is", ms[1]) - } - - ms.Set("more,here") - - if ms[4] != "more" { - t.Errorf("expected value to be %v, got %v", "more", ms[4]) - } - - if len(ms) != 6 { - t.Errorf("expected the length of ms to be %v, got %v", 6, len(ms)) - } - -} - -func TestNewSeries(t *testing.T) { - s := NewSeries("cpu", 1000, 10000) - - if s.PointCount != 1000 { - t.Errorf("expected value to be %v, got %v", 1000, s.PointCount) - } - - if s.SeriesCount != 10000 { - t.Errorf("expected value to be %v, got %v", 10000, s.SeriesCount) - } - - if s.Measurement != "cpu" { - t.Errorf("expected value to be %v, got %v", "cpu", s.Measurement) - } -} - -func TestNewConfig(t *testing.T) { - c := NewConfig() - - if c.Write.BatchSize != 5000 { - t.Errorf("expected value to be %v, got %v", 5000, c.Write.BatchSize) - } - -} - -func TestSeriesIter_Next(t *testing.T) { - s := NewSeries("cpu", 1000, 10000) - const shortForm = "2006-Jan-02" - tm, _ := time.Parse(shortForm, "2013-Feb-03") - i := s.Iter(10, tm, "n") - if i.count != -1 { - t.Errorf("expected value to be %v, go %v", -1, i.count) - } - - _, ok := i.Next() - - for ok { - _, ok = i.Next() - } - - if i.count != s.SeriesCount { - t.Errorf("expected value to be %v, go %v", s.SeriesCount, i.count) - } - -} - -func TestConfig_newClient(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("X-Influxdb-Version", "x.x") - return - })) - defer ts.Close() - - ms := make(Measurements, 0) - ms.Set("this,is,a,test") - - url := ts.URL[7:] - - cfg, err := DecodeFile("test.toml") - - if err != nil { - panic(err) - } - - cfg.Write.Address = url - - // the client.NewClient method in the influxdb go - // client never returns an error that is not nil. - // To test that the client actually gets created without - // any error, I run the Ping() method to verify that everything - // is okay. - c, err := cfg.NewClient() - - // This err will never be nil. See the following URL: - // https://github.com/influxdb/influxdb/blob/master/client/influxdb.go#L119 - if err != nil { - t.Error(err) - } - - // Verify that the client actually gets created correctly - d, version, err := c.Ping() - if err != nil { - t.Fatalf("unexpected error. expected %v, actual %v", nil, err) - } - if d == 0 { - t.Fatalf("expected a duration greater than zero. actual %v", d) - } - if version != "x.x" { - t.Fatalf("unexpected version. expected %s, actual %v", "x.x", version) - } -} - -func TestRun(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var data client.Response - w.WriteHeader(http.StatusOK) - _ = json.NewEncoder(w).Encode(data) - })) - defer ts.Close() - - ms := make(Measurements, 0) - ms.Set("this,is,a,test") - - url := ts.URL[7:] - - cfg, _ := DecodeFile("test.toml") - - cfg.Write.Address = url - - d := make(chan struct{}) - timestamp := make(chan time.Time) - - tp, _, rts, tmr := Run(cfg, d, timestamp) - - ps := cfg.Series[0].SeriesCount * cfg.Series[0].PointCount - - if tp != ps { - t.Fatalf("unexpected error. expected %v, actual %v", ps, tp) - } - - if len(rts) != ps/cfg.Write.BatchSize { - t.Fatalf("unexpected error. expected %v, actual %v", ps/cfg.Write.BatchSize, len(rts)) - } - - var epoch time.Time - - if tmr.Start() == epoch { - t.Errorf("expected trm.start not to be %s", epoch) - } - - if tmr.End() == epoch { - t.Errorf("expected trm.end not to be %s", epoch) - } -} diff --git a/stress/stress.toml b/stress/stress.toml new file mode 100644 index 0000000000..f9fe4b5734 --- /dev/null +++ b/stress/stress.toml @@ -0,0 +1,53 @@ +[provision] + [provision.basic] + enabled = true + address = "localhost:8086" + database = "stress" + reset_database = true + +[write] + [write.point_generator] + [write.point_generator.basic] + enabled = true + point_count = 100 + series_count = 100000 + tick = "10s" + jitter = true + measurement = "cpu" + start_date = "2006-Jan-02" + [[write.point_generator.basic.tag]] + key = "host" + value = "server" + [[write.point_generator.basic.tag]] + key = "location" + value = "us-west" + [[write.point_generator.basic.field]] + key = "value" + value = "float64" + + + [write.influx_client] + [write.influx_client.basic] + enabled = true + address = "localhost:8086" # stress_test_server runs on port 1234 + database = "stress" + precision = "n" + batch_size = 10000 + batch_interval = "0s" + concurrency = 10 + ssl = false + format = "line_http" # line_udp, graphite_tcp, graphite_udp + +[read] + [read.query_generator] + [read.query_generator.basic] + template = "SELECT count(value) FROM cpu where host='server-%v'" + query_count = 250 + + [read.query_client] + [read.query_client.basic] + address = "localhost:8086" + database = "stress" + query_interval = "100ms" + concurrency = 1 + diff --git a/stress/stress_test.go b/stress/stress_test.go new file mode 100644 index 0000000000..a3b7c2b18f --- /dev/null +++ b/stress/stress_test.go @@ -0,0 +1,597 @@ +package stress + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "regexp" + "strings" + "testing" + "time" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/influxdb/models" +) + +func TestTimer_StartTimer(t *testing.T) { + var epoch time.Time + tmr := &Timer{} + tmr.StartTimer() + s := tmr.Start() + if s == epoch { + t.Errorf("expected tmr.start to not be %v", s) + } +} + +func TestNewTimer(t *testing.T) { + var epoch time.Time + tmr := NewTimer() + s := tmr.Start() + if s == epoch { + t.Errorf("expected tmr.start to not be %v", s) + } + e := tmr.End() + if e != epoch { + t.Errorf("expected tmr.stop to be %v, got %v", epoch, e) + } +} + +func TestTimer_StopTimer(t *testing.T) { + var epoch time.Time + tmr := NewTimer() + tmr.StopTimer() + e := tmr.End() + if e == epoch { + t.Errorf("expected tmr.stop to not be %v", e) + } +} + +func TestTimer_Elapsed(t *testing.T) { + tmr := NewTimer() + time.Sleep(2 * time.Second) + tmr.StopTimer() + e := tmr.Elapsed() + if time.Duration(2*time.Second) > e || e > time.Duration(3*time.Second) { + t.Errorf("expected around %s got %s", time.Duration(2*time.Second), e) + } +} + +/// basic.go + +// Types are off +func Test_typeArr(t *testing.T) { + var re *regexp.Regexp + var b bool + arr := []string{ + "float64", + "int", + "bool", + } + + ts := typeArr(arr) + + re = regexp.MustCompile(`[1-9]\d*`) + b = re.MatchString(ts[0].(string)) + if !b { + t.Errorf("Expected line protocol float64 got %v", ts[0]) + } + + re = regexp.MustCompile(`[1-9]\d*i`) + b = re.MatchString(ts[1].(string)) + if !b { + t.Errorf("Expected line protocol int got %v", ts[1]) + } + + re = regexp.MustCompile(`true|false`) + b = re.MatchString(ts[2].(string)) + if !b { + t.Errorf("Expected line protocol bool got %v", ts[2]) + } + +} + +func Test_typeArrBadTypes(t *testing.T) { + arr := []string{ + "default", + "rand", + "", + } + + ts := typeArr(arr) + + for _, x := range ts { + re := regexp.MustCompile(`[1-9]\d*`) + b := re.MatchString(x.(string)) + if !b { + t.Errorf("Expected line protocol float64 got %v", x) + } + } +} + +func TestPnt_Line(t *testing.T) { + p := &Pnt{} + b := []byte("a,b=1,c=1 v=1") + + p.Set(b) + + if string(p.Line()) != string(b) { + t.Errorf("Expected `%v` to `%v`", string(b), string(p.Line())) + } +} + +func TestAbstractTags_Template(t *testing.T) { + tags := AbstractTags{ + AbstractTag{ + Key: "host", + Value: "server", + }, + AbstractTag{ + Key: "location", + Value: "us-west", + }, + } + + s := tags.Template() + tm := "host=server-%v,location=us-west" + + if s != tm { + t.Errorf("Expected %v got %v", tm, s) + } +} + +func TestAbstractFields_TemplateOneField(t *testing.T) { + fields := AbstractFields{ + AbstractField{ + Key: "fValue", + Type: "float64", + }, + } + + tm, _ := fields.Template() + + s := "fValue=%v" + if s != tm { + t.Errorf("Expected `%v` got `%v`", s, tm) + } + +} + +func TestAbstractFields_TemplateManyFields(t *testing.T) { + fields := AbstractFields{ + AbstractField{ + Key: "fValue", + Type: "float64", + }, + AbstractField{ + Key: "iValue", + Type: "int", + }, + AbstractField{ + Key: "bValue", + Type: "bool", + }, + AbstractField{ + Key: "rValue", + Type: "rnd", + }, + } + + tm, ty := fields.Template() + + s := "fValue=%v,iValue=%v,bValue=%v,rValue=%v" + if s != tm { + t.Errorf("Expected `%v` got `%v`", s, tm) + } + + for i, f := range fields { + if f.Type != ty[i] { + t.Errorf("Expected %v got %v", f.Type, ty[i]) + } + } + +} + +var basicPG = &BasicPointGenerator{ + PointCount: 100, + Tick: "10s", + Measurement: "cpu", + SeriesCount: 100, + Tags: AbstractTags{ + AbstractTag{ + Key: "host", + Value: "server", + }, + AbstractTag{ + Key: "location", + Value: "us-west", + }, + }, + Fields: AbstractFields{ + AbstractField{ + Key: "value", + Type: "float64", + }, + }, + StartDate: "2006-Jan-01", +} + +func TestBasicPointGenerator_Template(t *testing.T) { + fn := basicPG.Template() + now := time.Now() + m := "cpu,host=server-1,location=us-west" + ts := fmt.Sprintf("%v", now.UnixNano()) + + tm := strings.Split(string(fn(1, now).Line()), " ") + + if m != tm[0] { + t.Errorf("Expected %s got %s", m, tm[0]) + } + + if !strings.HasPrefix(tm[1], "value=") { + t.Errorf("Expected %v to start with `value=`", tm[1]) + } + + if ts != string(tm[2]) { + t.Errorf("Expected %s got %s", ts, tm[2]) + } +} + +func TestBasicPointGenerator_Generate(t *testing.T) { + ps, err := basicPG.Generate() + if err != nil { + t.Error(err) + } + + var buf bytes.Buffer + + for p := range ps { + b := p.Line() + + buf.Write(b) + buf.Write([]byte("\n")) + } + + bs := buf.Bytes() + bs = bs[0 : len(bs)-1] + + _, err = models.ParsePoints(bs) + if err != nil { + t.Error(err) + } +} + +func Test_post(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + content, _ := ioutil.ReadAll(r.Body) + lines := strings.Split(string(content), "\n") + if len(lines) != 3 { + t.Errorf("Expected 3 lines got %v", len(lines)) + } + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + b := []byte( + `cpu,host=server-1,location=us-west value=100 12932 + cpu,host=server-2,location=us-west value=10 12932 + cpu,host=server-3,location=us-west value=120 12932`, + ) + + _, err := post(ts.URL, "application/x-www-form-urlencoded", bytes.NewBuffer(b)) + if err != nil { + t.Error(err) + } +} + +var basicIC = &BasicClient{ + Address: "localhost:8086", + Database: "stress", + Precision: "n", + BatchSize: 1000, + BatchInterval: "0s", + Concurrency: 10, + Format: "line_http", +} + +func TestBasicClient_send(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + content, _ := ioutil.ReadAll(r.Body) + lines := strings.Split(string(content), "\n") + if len(lines) != 3 { + t.Errorf("Expected 3 lines got %v", len(lines)) + } + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + basicIC.Address = ts.URL[7:] + b := []byte( + `cpu,host=server-1,location=us-west value=100 12932 + cpu,host=server-2,location=us-west value=10 12932 + cpu,host=server-3,location=us-west value=120 12932`, + ) + _, err := basicIC.send(b) + if err != nil { + t.Error(err) + } + +} + +func TestBasicClient_Batch(t *testing.T) { + c := make(chan Point, 0) + r := make(chan response, 0) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + content, _ := ioutil.ReadAll(r.Body) + lines := strings.Split(string(content), "\n") + if len(lines) != 1000 { + t.Errorf("Expected 1000 lines got %v", len(lines)) + } + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + basicIC.Address = ts.URL[7:] + + go func(c chan Point) { + defer close(c) + + for i := 0; i < 1000; i++ { + p := &Pnt{} + p.Next(i, time.Now()) + c <- *p + } + + }(c) + + go func(r chan response) { + for _ = range r { + } + }(r) + + err := basicIC.Batch(c, r) + close(r) + if err != nil { + t.Error(err) + } + +} + +var basicQ = &BasicQuery{ + Template: Query("SELECT count(value) from cpu WHERE host='server-%v'"), + QueryCount: 100, +} + +func TestBasicQuery_QueryGenerate(t *testing.T) { + qs, _ := basicQ.QueryGenerate(time.Now) + + i := 0 + for q := range qs { + tm := fmt.Sprintf(string(basicQ.Template), i) + if Query(tm) != q { + t.Errorf("Expected %v to be %v", q, tm) + } + i++ + } +} + +var basicQC = &BasicQueryClient{ + Address: "localhost:8086", + Database: "stress", + QueryInterval: "10s", + Concurrency: 1, +} + +func TestBasicQueryClient_Query(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Influxdb-Version", "x.x") + var data client.Response + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(data) + + return + })) + defer ts.Close() + + basicQC.Address = ts.URL[7:] + basicQC.Init() + + q := "SELECT count(value) FROM cpu" + r, err := basicQC.Query(Query(q)) + if err != nil { + t.Error(err) + } + + var epoch time.Time + + if r.Time == epoch { + t.Errorf("Expected %v to not be epoch", r.Time) + } + + elapsed := r.Timer.Elapsed() + if elapsed == time.Duration(0) { + t.Errorf("Expected %v to not be 0", elapsed) + } + +} + +/// config.go +func Test_NewConfigWithFile(t *testing.T) { + c, err := NewConfig("stress.toml") + if err != nil { + t.Error(err) + } + p := c.Provision + w := c.Write + r := c.Read + + if p.Basic.Address != "localhost:8086" { + t.Errorf("Expected `localhost:8086` got %s", p.Basic.Address) + } + if p.Basic.Database != "stress" { + t.Errorf("Expected `stress` got %s", p.Basic.Database) + } + if p.Basic.ResetDatabase != true { + t.Errorf("Expected true got %v", p.Basic.ResetDatabase) + } + + pg := w.PointGenerators.Basic + if pg.PointCount != 100 { + t.Errorf("Expected 100 got %v", pg.PointCount) + } + if pg.SeriesCount != 100000 { + t.Errorf("Expected 100000 got %v", pg.SeriesCount) + } + if pg.Tick != "10s" { + t.Errorf("Expected 10s got %s", pg.Tick) + } + if pg.Measurement != "cpu" { + t.Errorf("Expected cpu got %s", pg.Measurement) + } + if pg.StartDate != "2006-Jan-02" { + t.Errorf("Expected `2006-Jan-02` got `%s`", pg.StartDate) + } + // TODO: Check tags + // TODO: Check fields + + wc := w.InfluxClients.Basic + if wc.Address != "localhost:8086" { + t.Errorf("Expected `localhost:8086` got %s", wc.Address) + } + if wc.Database != "stress" { + t.Errorf("Expected stress got %s", wc.Database) + } + if wc.Precision != "n" { + t.Errorf("Expected n got %s", wc.Precision) + } + if wc.BatchSize != 10000 { + t.Errorf("Expected 10000 got %v", wc.BatchSize) + } + if wc.BatchInterval != "0s" { + t.Errorf("Expected 0s got %v", wc.BatchInterval) + } + if wc.Concurrency != 10 { + t.Errorf("Expected 10 got %v", wc.Concurrency) + } + if wc.SSL != false { + t.Errorf("Expected 10 got %v", wc.SSL) + } + if wc.Format != "line_http" { + t.Errorf("Expected `line_http` got %s", wc.Format) + } + + qg := r.QueryGenerators.Basic + if qg.Template != "SELECT count(value) FROM cpu where host='server-%v'" { + t.Errorf("Expected `SELECT count(value) FROM cpu where host='server-%%v'` got %s", qg.Template) + } + if qg.QueryCount != 250 { + t.Errorf("Expected 250 got %v", qg.QueryCount) + } + + qc := r.QueryClients.Basic + if qc.Address != "localhost:8086" { + t.Errorf("Expected `localhost:8086` got %s", qc.Address) + } + if qc.Database != "stress" { + t.Errorf("Expected stress got %s", qc.Database) + } + if qc.QueryInterval != "100ms" { + t.Errorf("Expected 100ms got %s", qc.QueryInterval) + } + if qc.Concurrency != 1 { + t.Errorf("Expected 1 got %v", qc.Concurrency) + } +} + +func Test_NewConfigWithoutFile(t *testing.T) { + c, err := NewConfig("") + if err != nil { + t.Error(err) + } + p := c.Provision + w := c.Write + r := c.Read + + if p.Basic.Address != "localhost:8086" { + t.Errorf("Expected `localhost:8086` got %s", p.Basic.Address) + } + if p.Basic.Database != "stress" { + t.Errorf("Expected `stress` got %s", p.Basic.Database) + } + if p.Basic.ResetDatabase != true { + t.Errorf("Expected true got %v", p.Basic.ResetDatabase) + } + + pg := w.PointGenerators.Basic + if pg.PointCount != 100 { + t.Errorf("Expected 100 got %v", pg.PointCount) + } + if pg.SeriesCount != 100000 { + t.Errorf("Expected 100000 got %v", pg.SeriesCount) + } + if pg.Tick != "10s" { + t.Errorf("Expected 10s got %s", pg.Tick) + } + if pg.Measurement != "cpu" { + t.Errorf("Expected cpu got %s", pg.Measurement) + } + if pg.StartDate != "2006-Jan-02" { + t.Errorf("Expected `2006-Jan-02` got `%s`", pg.StartDate) + } + // TODO: Check tags + // TODO: Check fields + + wc := w.InfluxClients.Basic + if wc.Address != "localhost:8086" { + t.Errorf("Expected `localhost:8086` got %s", wc.Address) + } + if wc.Database != "stress" { + t.Errorf("Expected stress got %s", wc.Database) + } + if wc.Precision != "n" { + t.Errorf("Expected n got %s", wc.Precision) + } + if wc.BatchSize != 10000 { + t.Errorf("Expected 10000 got %v", wc.BatchSize) + } + if wc.BatchInterval != "0s" { + t.Errorf("Expected 0s got %v", wc.BatchInterval) + } + if wc.Concurrency != 10 { + t.Errorf("Expected 10 got %v", wc.Concurrency) + } + if wc.SSL != false { + t.Errorf("Expected 10 got %v", wc.SSL) + } + if wc.Format != "line_http" { + t.Errorf("Expected `line_http` got %s", wc.Format) + } + + qg := r.QueryGenerators.Basic + if qg.Template != "SELECT count(value) FROM cpu where host='server-%v'" { + t.Errorf("Expected `SELECT count(value) FROM cpu where host='server-%%v'` got %s", qg.Template) + } + if qg.QueryCount != 250 { + t.Errorf("Expected 250 got %v", qg.QueryCount) + } + + qc := r.QueryClients.Basic + if qc.Address != "localhost:8086" { + t.Errorf("Expected `localhost:8086` got %s", qc.Address) + } + if qc.Database != "stress" { + t.Errorf("Expected stress got %s", qc.Database) + } + if qc.QueryInterval != "100ms" { + t.Errorf("Expected 100ms got %s", qc.QueryInterval) + } + if qc.Concurrency != 1 { + t.Errorf("Expected 1 got %v", qc.Concurrency) + } +} + +/// run.go +// TODO diff --git a/stress/stress_test_server/server.go b/stress/stress_test_server/server.go new file mode 100644 index 0000000000..39a419eaf5 --- /dev/null +++ b/stress/stress_test_server/server.go @@ -0,0 +1,73 @@ +package main + +import ( + "expvar" + "fmt" + "github.com/paulbellamy/ratecounter" + "io" + "io/ioutil" + "net/http" + "strings" + "sync" + "time" +) + +var ( + counter *ratecounter.RateCounter + hitspersecond = expvar.NewInt("hits_per_second") + mu sync.Mutex + m sync.Mutex +) + +// Query handles /query endpoint +func Query(w http.ResponseWriter, req *http.Request) { + io.WriteString(w, "du") +} + +// Count handles /count endpoint +func Count(w http.ResponseWriter, req *http.Request) { + io.WriteString(w, fmt.Sprintf("%v", linecount)) +} + +var n int +var linecount int + +// Write handles /write endpoints +func Write(w http.ResponseWriter, req *http.Request) { + mu.Lock() + n++ + mu.Unlock() + + counter.Incr(1) + hitspersecond.Set(counter.Rate()) + w.WriteHeader(http.StatusNoContent) + fmt.Printf("Reqests Per Second: %v\n", hitspersecond) + fmt.Printf("Count: %v\n", n) + + content, _ := ioutil.ReadAll(req.Body) + m.Lock() + arr := strings.Split(string(content), "\n") + linecount += len(arr) + m.Unlock() + + fmt.Printf("Line Count: %v\n\n", linecount) +} + +func init() { + n = 0 + linecount = 0 + counter = ratecounter.NewRateCounter(1 * time.Second) +} + +func main() { + mux := http.NewServeMux() + mux.HandleFunc("/query", Query) + mux.HandleFunc("/write", Write) + mux.HandleFunc("/count", Count) + + err := http.ListenAndServe(":1234", mux) + if err != nil { + fmt.Println("Fatal") + } + +} diff --git a/stress/template.go b/stress/template.go new file mode 100644 index 0000000000..3707bc4299 --- /dev/null +++ b/stress/template.go @@ -0,0 +1,63 @@ +package stress + +var s = ` +[provision] + [provision.basic] + enabled = true + address = "localhost:8086" + database = "stress" + reset_database = true + +[write] + [write.point_generator] + [write.point_generator.basic] + enabled = true + point_count = 100 + series_count = 100000 + tick = "10s" + jitter = true + measurement = "cpu" + start_date = "2006-Jan-02" + [[write.point_generator.basic.tag]] + key = "host" + value = "server" + [[write.point_generator.basic.tag]] + key = "location" + value = "us-west" + [[write.point_generator.basic.field]] + key = "value" + value = "float64" + + + [write.influx_client] + [write.influx_client.basic] + enabled = true + #address = "localhost:1234" + address = "localhost:8086" + database = "stress" + precision = "n" + batch_size = 10000 + batch_interval = "0s" + concurrency = 10 + ssl = false + format = "line_http" # line_udp, graphite_tcp, graphite_udp + +[read] + [read.query_generator] + [read.query_generator.basic] + template = "SELECT count(value) FROM cpu where host='server-%v'" + query_count = 250 + + [read.query_client] + [read.query_client.basic] + address = "localhost:8086" + database = "stress" + query_interval = "100ms" + concurrency = 1 +` + +// BasicStress returns a config for a basic +// stress test. +func BasicStress() (*Config, error) { + return DecodeConfig(s) +} diff --git a/stress/test.toml b/stress/test.toml deleted file mode 100644 index c008366ba2..0000000000 --- a/stress/test.toml +++ /dev/null @@ -1,43 +0,0 @@ -channel_buffer_size = 100 - -[write] - concurrency = 10 - batch_size = 10000 - batch_interval = "0s" - database = "stress" - precision = "s" - address = "localhost:8086" - reset_database = true - start_date = "2006-Jan-02" - -[[series]] - tick = "10s" - point_count = 10 # number of points that will be written for each of the series - measurement = "cpu" - series_count = 10000 - - tag_count = 0 # - - [[series.tag]] - key = "host" - value = "idk" - - [[series.tag]] - key = "location" - value = "lame" - - [[series.field]] - key = "value" - type = "float64" - - [[series.field]] - key = "percent" - type = "int" - - [[series.field]] - key = "idk" - type = "bool" - - [[series.field]] - key = "default" - diff --git a/stress/util.go b/stress/util.go new file mode 100644 index 0000000000..e3000340ba --- /dev/null +++ b/stress/util.go @@ -0,0 +1,132 @@ +package stress + +import ( + "time" +) + +// Timer is struct that can be used to track elaspsed time +type Timer struct { + start time.Time + end time.Time +} + +// Start returns a Timers start field +func (t *Timer) Start() time.Time { + return t.start +} + +// End returns a Timers end field +func (t *Timer) End() time.Time { + return t.end +} + +// StartTimer sets a timers `start` field to the current time +func (t *Timer) StartTimer() { + t.start = time.Now() +} + +// StopTimer sets a timers `end` field to the current time +func (t *Timer) StopTimer() { + t.end = time.Now() +} + +// Elapsed returns the total elapsed time between the `start` +// and `end` fields on a timer. +func (t *Timer) Elapsed() time.Duration { + return t.end.Sub(t.start) +} + +// NewTimer returns a pointer to a `Timer` struct where the +// timers `start` field has been set to `time.Now()` +func NewTimer() *Timer { + t := &Timer{} + t.StartTimer() + return t +} + +// ResponseTime is a struct that contains `Value` +// `Time` pairing. +type ResponseTime struct { + Value int + Time time.Time +} + +// NewResponseTime returns a new response time +// with value `v` and time `time.Now()`. +func NewResponseTime(v int) ResponseTime { + r := ResponseTime{Value: v, Time: time.Now()} + return r +} + +// ResponseTimes is a slice of response times +type ResponseTimes []ResponseTime + +// Implements the `Len` method for the +// sort.Interface type +func (rs ResponseTimes) Len() int { + return len(rs) +} + +// Implements the `Less` method for the +// sort.Interface type +func (rs ResponseTimes) Less(i, j int) bool { + return rs[i].Value < rs[j].Value +} + +// Implements the `Swap` method for the +// sort.Interface type +func (rs ResponseTimes) Swap(i, j int) { + rs[i], rs[j] = rs[j], rs[i] +} + +////////////////////////////////// + +// ConcurrencyLimiter is a go routine safe struct that can be used to +// ensure that no more than a specifid max number of goroutines are +// executing. +type ConcurrencyLimiter struct { + inc chan chan struct{} + dec chan struct{} + max int + count int +} + +// NewConcurrencyLimiter returns a configured limiter that will +// ensure that calls to Increment will block if the max is hit. +func NewConcurrencyLimiter(max int) *ConcurrencyLimiter { + c := &ConcurrencyLimiter{ + inc: make(chan chan struct{}), + dec: make(chan struct{}, max), + max: max, + } + go c.handleLimits() + return c +} + +// Increment will increase the count of running goroutines by 1. +// if the number is currently at the max, the call to Increment +// will block until another goroutine decrements. +func (c *ConcurrencyLimiter) Increment() { + r := make(chan struct{}) + c.inc <- r + <-r +} + +// Decrement will reduce the count of running goroutines by 1 +func (c *ConcurrencyLimiter) Decrement() { + c.dec <- struct{}{} +} + +// handleLimits runs in a goroutine to manage the count of +// running goroutines. +func (c *ConcurrencyLimiter) handleLimits() { + for { + r := <-c.inc + if c.count >= c.max { + <-c.dec + c.count-- + } + c.count++ + r <- struct{}{} + } +}