282 lines
6.5 KiB
282 lines
6.5 KiB
package runner
import (
// tag is a struct that contains data
// about a tag for in a series
type tag struct {
Key string `toml:"key"`
Value string `toml:"value"`
// tag is a struct that contains data
// about a field for in a series
type field struct {
Key string `toml:"key"`
Type string `toml:"type"`
// series is a struct that contains data
// about the series that will be written
// during a stress test
type series struct {
PointCount int `toml:"point_count"`
Tick string `toml:"tick"`
Jitter bool `toml:"jitter"`
Measurement string `toml:"measurement"`
SeriesCount int `toml:"series_count"`
TagCount int `toml:"tag_count"`
Tags []tag `toml:"tag"`
Fields []field `toml:"field"`
// write is a struct that contains the business
// logic for the stress test. e.g. where the
// influxdb instance is running, what database
// should points be written into
type write struct {
Concurrency int `toml:"concurrency"`
BatchSize int `toml:"batch_size"`
BatchInterval string `toml:"batch_interval"`
Database string `toml:"database"`
ResetDatabase bool `toml:"reset_database"`
Address string `toml:"address"`
Precision string `toml:"precision"`
StartDate string `toml:"start_date"`
// query is a struct that contains the logic for
// a query that will be ran on during the stress
// test
type query struct {
Enabled bool `toml:"enabled"`
Concurrency int `toml:"concurrency"`
Aggregates []string `toml:"aggregates"`
Fields []string `toml:"fields"`
// measurementQuery is a struct that contains
// the logic that runs a query against a measurement
// over a time period that is specified by
// `Offset`
type measurementQuery struct {
Offset string `toml:"offset"`
// seriesQuery is a struct that contains
// the logic that runs a query against a single
// series
type seriesQuery struct {
Interval string `toml:"interval"`
// Config is a struct that is passed into the `Run()` function.
type Config struct {
Write write `toml:"write"`
Series []series `toml:"series"`
MeasurementQuery measurementQuery `toml:"measurement_query"`
SeriesQuery seriesQuery `toml:"series_query"`
ChannelBufferSize int `toml:"channel_buffer_size"`
SSL bool `toml:"ssl"`
// NewSeries takes a measurement, and point count,
// and a series count and returns a series
func NewSeries(m string, p int, sc int) series {
s := series{
PointCount: p,
SeriesCount: sc,
Tick: "1s",
Measurement: m,
Tags: []tag{
Key: "host",
Value: "server",
Fields: []field{
Key: "value",
return s
// NewConfig returns a pointer to a config
// with some default parameters set
func NewConfig() *Config {
w := write{
Concurrency: 10,
BatchSize: 5000,
BatchInterval: "0s",
Database: "stress",
ResetDatabase: true,
Address: "localhost:8086",
Precision: "n",
c := &Config{
Write: w,
return c
// DecodeFile takes a file path for a toml config file
// and returns a pointer to a Config Struct.
func DecodeFile(s string) (*Config, error) {
t := &Config{}
// Decode the toml file
if _, err := toml.DecodeFile(s, t); err != nil {
return nil, err
// Initialize Config struct
// NOTE: Not happy with the implementation
// but it will do for now
for j, srs := range t.Series {
for i := 0; i < srs.TagCount; i++ {
tag := tag{
Key: fmt.Sprintf("tag-key-%d", i),
Value: "tag-value",
srs.Tags = append(srs.Tags, tag)
t.Series[j] = srs
return t, nil
// seriesIter is a struct that contains a
// series and a count, where count is the
//number of points that have been written
// for the series `s`
type seriesIter struct {
s *series
count int
timestamp time.Time
precision string
// writeInterval returns a timestamp for the current time
// interval
func (s *series) writeInterval(i int, start time.Time) time.Time {
var tick time.Duration
var j int
var err error
tick, err = time.ParseDuration(s.Tick)
if err != nil {
if s.Jitter {
j = rand.Intn(int(tick))
if j%2 == 0 {
j = -2 * j
tick = tick*time.Duration(i) + time.Duration(j)
return start.Add(tick)
// Iter returns a pointer to a seriesIter
func (s *series) Iter(i int, start time.Time, p string) *seriesIter {
return &seriesIter{s: s, count: -1, timestamp: s.writeInterval(i, start), precision: p}
// Next returns a new point for a series.
// Currently, there is an off by one bug here.
func (iter *seriesIter) Next() ([]byte, bool) {
var buf bytes.Buffer
buf.Write([]byte(fmt.Sprintf("%v,", iter.s.Measurement)))
buf.Write([]byte(" "))
buf.Write([]byte(" "))
switch iter.precision {
case "s":
buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.Unix())))
buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.UnixNano())))
b := iter.count < iter.s.SeriesCount
byt := buf.Bytes()
return byt, b
// newTagSet returns a byte array representation
// of the tagset for a series
func (s *series) newTagSet(c int) []byte {
var buf bytes.Buffer
for _, tag := range s.Tags {
buf.Write([]byte(fmt.Sprintf("%v=%v-%v,", tag.Key, tag.Value, c)))
b := buf.Bytes()
b = b[0 : len(b)-1]
return b
// newFieldSet returns a byte array representation
// of the field-set for a series
func (s *series) newFieldSet(c int) []byte {
var buf bytes.Buffer
for _, field := range s.Fields {
switch field.Type {
case "float64-flat":
if rand.Intn(10) > 2 {
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, 100)))
} else {
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, 100+rand.Intn(100))))
case "float64-inc+":
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, c+rand.Intn(2))))
case "float64-inc":
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, c)))
case "float64":
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, rand.Intn(1000))))
case "int":
buf.Write([]byte(fmt.Sprintf("%v=%vi,", field.Key, rand.Intn(1000))))
case "bool":
b := rand.Intn(2) == 1
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, b)))
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, rand.Intn(1000))))
b := buf.Bytes()
b = b[0 : len(b)-1]
return b