diff --git a/stress/config.go b/stress/config.go index 31b729f0c6..4b512a0a81 100644 --- a/stress/config.go +++ b/stress/config.go @@ -170,6 +170,7 @@ type seriesIter struct { s *series count int timestamp time.Time + precision string } // writeInterval returns a timestamp for the current time @@ -181,59 +182,13 @@ func (s *series) writeInterval(weeks int, i int) time.Time { } // Iter returns a pointer to a seriesIter -func (s *series) Iter(weeks int, i int) *seriesIter { +func (s *series) Iter(weeks int, i int, p string) *seriesIter { - return &seriesIter{s: s, count: -1, timestamp: s.writeInterval(weeks, i)} -} - -// newTagMap returns a tagset -func (s *series) newTagMap(i int) map[string]string { - m := map[string]string{} - - for _, tag := range s.Tags { - m[tag.Key] = fmt.Sprintf("%s-%d", tag.Value, i) - } - - return m -} - -// newFieldMap returns a new field set for -// a given series -func (s *series) newFieldMap() map[string]interface{} { - m := map[string]interface{}{} - - for _, field := range s.Fields { - switch field.Type { - case "float64": - m[field.Key] = float64(rand.Intn(1000)) - case "int": - m[field.Key] = rand.Intn(1000) - case "bool": - b := rand.Intn(2) == 1 - m[field.Key] = b - default: - m[field.Key] = float64(rand.Intn(1000)) - } - - } - - return m + return &seriesIter{s: s, count: -1, timestamp: s.writeInterval(weeks, i), precision: p} } // Next returns a new point for a series. // Currently, there is an off by one bug here. -//func (iter *seriesIter) Next() (*client.Point, bool) { -// iter.count++ -// p := client.Point{ -// Measurement: iter.s.Measurement, -// Tags: iter.s.newTagMap(iter.count), -// Fields: iter.s.newFieldMap(), -// Time: iter.timestamp, -// } -// b := iter.count < iter.s.SeriesCount -// return &p, b -//} - func (iter *seriesIter) Next() ([]byte, bool) { var buf bytes.Buffer iter.count++ @@ -243,7 +198,13 @@ func (iter *seriesIter) Next() ([]byte, bool) { buf.Write([]byte(" ")) buf.Write(iter.s.newFieldSet()) buf.Write([]byte(" ")) - buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.UnixNano()))) + + switch iter.precision { + case "s": + buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.Unix()))) + default: + buf.Write([]byte(fmt.Sprintf("%v", iter.timestamp.UnixNano()))) + } b := iter.count < iter.s.SeriesCount byt := buf.Bytes() @@ -251,6 +212,8 @@ func (iter *seriesIter) Next() ([]byte, bool) { return byt, b } +// newTagSet returns a byte array representation +// of the tagset for a series func (s *series) newTagSet(c int) []byte { var buf bytes.Buffer for _, tag := range s.Tags { @@ -263,6 +226,8 @@ func (s *series) newTagSet(c int) []byte { return b } +// newFieldSet returns a byte array representation +// of the field-set for a series func (s *series) newFieldSet() []byte { var buf bytes.Buffer diff --git a/stress/runner.go b/stress/runner.go index 8fac01f825..af75cc2ad5 100644 --- a/stress/runner.go +++ b/stress/runner.go @@ -15,6 +15,8 @@ import ( "github.com/influxdb/influxdb/client" ) +// TODO: Add jitter to timestamps + func post(url string, datatype string, data io.Reader) error { resp, err := http.Post(url, datatype, data) @@ -191,7 +193,6 @@ func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, f go func() { var buf bytes.Buffer - //points := []client.Point{} num := 0 for _, s := range cfg.Series { num += s.PointCount * s.SeriesCount @@ -205,21 +206,12 @@ func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, f ctr := 0 for _, testSeries := range cfg.Series { for i := 0; i < testSeries.PointCount; i++ { - iter := testSeries.Iter(cfg.Write.StartingPoint, i) + iter := testSeries.Iter(cfg.Write.StartingPoint, i, cfg.Write.Precision) p, ok := iter.Next() for ok { ctr++ - // add jitter - //if cfg.Write.Jitter != 0 { - // rnd := rand.Intn(cfg.Write.Jitter) - // if rnd%2 == 0 { - // rnd = -1 * rnd - // } - // p.Time = p.Time.Add(time.Duration(rnd)) - //} buf.Write(p) buf.Write([]byte("\n")) - //points = append(points, *p) if ctr != 0 && ctr%cfg.Write.BatchSize == 0 { b := buf.Bytes() @@ -254,31 +246,24 @@ func Run(cfg *Config, done chan struct{}, ts chan time.Time) (totalPoints int, f timer = NewTimer() for pnt := range ch { - // batch := &client.BatchPoints{ - // Database: cfg.Write.Database, - // WriteConsistency: "any", - // Time: time.Now(), - // Precision: cfg.Write.Precision, - // Points: pnt, - // } wg.Add(1) counter.Increment() - //totalPoints += len(batch.Points) - totalPoints += 5000 + totalPoints += cfg.Write.BatchSize + + instanceURL := fmt.Sprintf("http://%v/write?db=%v&precision=%v", cfg.Write.Address, cfg.Write.Database, cfg.Write.Precision) go func(b *bytes.Buffer, total int) { st := time.Now() - err := post("http://localhost:8086/write?db=stress", "application/x-www-form-urlencoded", b) + err := post(instanceURL, "application/x-www-form-urlencoded", b) if err != nil { // Should retry write if failed - //if _, err := c.Write(*b); err != nil { // Should retry write if failed mu.Lock() if lastSuccess { fmt.Println("ERROR: ", err.Error()) } failedRequests += 1 //totalPoints -= len(b.Points) - totalPoints -= 5000 + totalPoints -= cfg.Write.BatchSize lastSuccess = false mu.Unlock() } else { diff --git a/stress/runner_test.go b/stress/runner_test.go index 9691b6761b..874b5ddfae 100644 --- a/stress/runner_test.go +++ b/stress/runner_test.go @@ -4,7 +4,6 @@ import ( "encoding/json" "net/http" "net/http/httptest" - "reflect" "testing" "time" @@ -136,29 +135,9 @@ func TestNewConfig(t *testing.T) { } -func TestSeries_newTagMap(t *testing.T) { - s := NewSeries("cpu", 1000, 10000) - m := s.newTagMap(0) - - if m["host"] != "server-0" { - t.Errorf("expected value to be %v, go %v", "server-0", m["host"]) - } - -} - -func TestSeries_newFieldMap(t *testing.T) { - s := NewSeries("cpu", 1000, 10000) - m := s.newFieldMap() - - if reflect.TypeOf(m["value"]).Kind() != reflect.Float64 { - t.Errorf("expected type of value to be %v, go %v", reflect.Float64, reflect.TypeOf(m["value"]).Kind()) - } - -} - func TestSeriesIter_Next(t *testing.T) { s := NewSeries("cpu", 1000, 10000) - i := s.Iter(10, 0) + i := s.Iter(10, 0, "n") if i.count != -1 { t.Errorf("expected value to be %v, go %v", -1, i.count) }