parent
d6027071d2
commit
f056753afe
|
@ -2,7 +2,7 @@ channel_buffer_size = 100000
|
|||
|
||||
[write]
|
||||
concurrency = 10
|
||||
batch_size = 10000
|
||||
batch_size = 5000
|
||||
batch_interval = "0s"
|
||||
database = "stress"
|
||||
precision = "n"
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
channel_buffer_size = 100
|
||||
|
||||
[write]
|
||||
concurrency = 10
|
||||
batch_size = 5000
|
||||
batch_interval = "4s"
|
||||
database = "stress"
|
||||
precision = "n"
|
||||
address = "localhost:8086"
|
||||
reset_database = true
|
||||
starting_point = 4 # how far back in time to go in weeks
|
||||
|
||||
[[series]]
|
||||
point_count = 10000 # number of points that will be written for each of the series
|
||||
measurement = "cpu"
|
||||
series_count = 100000
|
||||
|
||||
[[series.tag]]
|
||||
key = "host"
|
||||
value = "server"
|
||||
|
||||
[[series.tag]]
|
||||
key = "location"
|
||||
value = "loc"
|
||||
|
||||
[[series.field]]
|
||||
key = "value"
|
||||
type = "float64"
|
||||
|
|
@ -1,12 +1,12 @@
|
|||
package runner
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
// tag is a struct that contains data
|
||||
|
@ -222,14 +222,66 @@ func (s *series) newFieldMap() map[string]interface{} {
|
|||
|
||||
// Next returns a new point for a series.
|
||||
// Currently, there is an off by one bug here.
|
||||
func (iter *seriesIter) Next() (*client.Point, bool) {
|
||||
//func (iter *seriesIter) Next() (*client.Point, bool) {
|
||||
// iter.count++
|
||||
// p := client.Point{
|
||||
// Measurement: iter.s.Measurement,
|
||||
// Tags: iter.s.newTagMap(iter.count),
|
||||
// Fields: iter.s.newFieldMap(),
|
||||
// Time: iter.timestamp,
|
||||
// }
|
||||
// b := iter.count < iter.s.SeriesCount
|
||||
// return &p, b
|
||||
//}
|
||||
|
||||
func (iter *seriesIter) Next() ([]byte, bool) {
|
||||
var buf bytes.Buffer
|
||||
iter.count++
|
||||
p := client.Point{
|
||||
Measurement: iter.s.Measurement,
|
||||
Tags: iter.s.newTagMap(iter.count),
|
||||
Fields: iter.s.newFieldMap(),
|
||||
Time: iter.timestamp,
|
||||
}
|
||||
|
||||
buf.Write([]byte(fmt.Sprintf("%v,", iter.s.Measurement)))
|
||||
buf.Write(iter.s.newTagSet(iter.count))
|
||||
buf.Write([]byte(" "))
|
||||
buf.Write(iter.s.newFieldSet())
|
||||
buf.Write([]byte(" "))
|
||||
buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.UnixNano())))
|
||||
|
||||
b := iter.count < iter.s.SeriesCount
|
||||
return &p, b
|
||||
byt := buf.Bytes()
|
||||
|
||||
return byt, b
|
||||
}
|
||||
|
||||
func (s *series) newTagSet(c int) []byte {
|
||||
var buf bytes.Buffer
|
||||
for _, tag := range s.Tags {
|
||||
buf.Write([]byte(fmt.Sprintf("%v=%v-%v,", tag.Key, tag.Value, c)))
|
||||
}
|
||||
|
||||
b := buf.Bytes()
|
||||
b = b[0 : len(b)-1]
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (s *series) newFieldSet() []byte {
|
||||
var buf bytes.Buffer
|
||||
|
||||
for _, field := range s.Fields {
|
||||
switch field.Type {
|
||||
case "float64":
|
||||
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, rand.Intn(1000))))
|
||||
case "int":
|
||||
buf.Write([]byte(fmt.Sprintf("%v=%vi,", field.Key, rand.Intn(1000))))
|
||||
case "bool":
|
||||
b := rand.Intn(2) == 1
|
||||
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, b)))
|
||||
default:
|
||||
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, rand.Intn(1000))))
|
||||
}
|
||||
}
|
||||
|
||||
b := buf.Bytes()
|
||||
b = b[0 : len(b)-1]
|
||||
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -2,7 +2,11 @@ package runner
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
//"math/rand"
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -11,6 +15,26 @@ import (
|
|||
"github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
func post(url string, datatype string, data io.Reader) error {
|
||||
|
||||
resp, err := http.Post(url, datatype, data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf(string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Timer is struct that can be used to track elaspsed time
|
||||
type Timer struct {
|
||||
start time.Time
|
||||
|
@ -163,10 +187,11 @@ func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, f
|
|||
|
||||
lastSuccess := true
|
||||
|
||||
ch := make(chan []client.Point, cfg.ChannelBufferSize)
|
||||
ch := make(chan []byte, cfg.ChannelBufferSize)
|
||||
|
||||
go func() {
|
||||
points := []client.Point{}
|
||||
var buf bytes.Buffer
|
||||
//points := []client.Point{}
|
||||
num := 0
|
||||
for _, s := range cfg.Series {
|
||||
num += s.PointCount * s.SeriesCount
|
||||
|
@ -185,22 +210,29 @@ func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, f
|
|||
for ok {
|
||||
ctr++
|
||||
// add jitter
|
||||
if cfg.Write.Jitter != 0 {
|
||||
rnd := rand.Intn(cfg.Write.Jitter)
|
||||
if rnd%2 == 0 {
|
||||
rnd = -1 * rnd
|
||||
}
|
||||
p.Time = p.Time.Add(time.Duration(rnd))
|
||||
}
|
||||
points = append(points, *p)
|
||||
if len(points) >= cfg.Write.BatchSize {
|
||||
ch <- points
|
||||
points = []client.Point{}
|
||||
//if cfg.Write.Jitter != 0 {
|
||||
// rnd := rand.Intn(cfg.Write.Jitter)
|
||||
// if rnd%2 == 0 {
|
||||
// rnd = -1 * rnd
|
||||
// }
|
||||
// p.Time = p.Time.Add(time.Duration(rnd))
|
||||
//}
|
||||
buf.Write(p)
|
||||
buf.Write([]byte("\n"))
|
||||
//points = append(points, *p)
|
||||
if ctr != 0 && ctr%cfg.Write.BatchSize == 0 {
|
||||
b := buf.Bytes()
|
||||
|
||||
b = b[0 : len(b)-2]
|
||||
|
||||
ch <- b
|
||||
var b2 bytes.Buffer
|
||||
buf = b2
|
||||
}
|
||||
|
||||
if cfg.MeasurementQuery.Enabled && ctr%num == 0 {
|
||||
select {
|
||||
case ts <- p.Time:
|
||||
case ts <- time.Now():
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -222,27 +254,31 @@ func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, f
|
|||
timer = NewTimer()
|
||||
|
||||
for pnt := range ch {
|
||||
batch := &client.BatchPoints{
|
||||
Database: cfg.Write.Database,
|
||||
WriteConsistency: "any",
|
||||
Time: time.Now(),
|
||||
Precision: cfg.Write.Precision,
|
||||
Points: pnt,
|
||||
}
|
||||
// batch := &client.BatchPoints{
|
||||
// Database: cfg.Write.Database,
|
||||
// WriteConsistency: "any",
|
||||
// Time: time.Now(),
|
||||
// Precision: cfg.Write.Precision,
|
||||
// Points: pnt,
|
||||
// }
|
||||
|
||||
wg.Add(1)
|
||||
counter.Increment()
|
||||
totalPoints += len(batch.Points)
|
||||
//totalPoints += len(batch.Points)
|
||||
totalPoints += 5000
|
||||
|
||||
go func(b *client.BatchPoints, total int) {
|
||||
go func(b *bytes.Buffer, total int) {
|
||||
st := time.Now()
|
||||
if _, err := c.Write(*b); err != nil { // Should retry write if failed
|
||||
err := post("http://localhost:8086/write?db=stress", "application/x-www-form-urlencoded", b)
|
||||
if err != nil { // Should retry write if failed
|
||||
//if _, err := c.Write(*b); err != nil { // Should retry write if failed
|
||||
mu.Lock()
|
||||
if lastSuccess {
|
||||
fmt.Println("ERROR: ", err.Error())
|
||||
}
|
||||
failedRequests += 1
|
||||
totalPoints -= len(b.Points)
|
||||
//totalPoints -= len(b.Points)
|
||||
totalPoints -= 5000
|
||||
lastSuccess = false
|
||||
mu.Unlock()
|
||||
} else {
|
||||
|
@ -261,7 +297,7 @@ func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, f
|
|||
if total%500000 == 0 {
|
||||
fmt.Printf("%d total points. %d in %s\n", total, cfg.Write.BatchSize, time.Since(st))
|
||||
}
|
||||
}(batch, totalPoints)
|
||||
}(bytes.NewBuffer(pnt), totalPoints)
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue