kapacitor fixes for stress

pull/5997/head
Nathaniel Cook 2016-03-11 17:34:34 -07:00
parent 3d544a9136
commit 2ce94c4ee9
2 changed files with 56 additions and 32 deletions

View File

@ -193,7 +193,13 @@ func (b *BasicPointGenerator) Generate() (<-chan Point, error) {
go func(c chan Point) {
defer close(c)
start, err := time.Parse("2006-Jan-02", b.StartDate)
var start time.Time
var err error
if b.StartDate == "now" {
start = time.Now()
} else {
start, err = time.Parse("2006-Jan-02", b.StartDate)
}
if err != nil {
fmt.Println(err)
return
@ -241,6 +247,7 @@ type BasicClient struct {
Enabled bool `toml:"enabled"`
Addresses []string `toml:"addresses"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
Precision string `toml:"precision"`
BatchSize int `toml:"batch_size"`
BatchInterval string `toml:"batch_interval"`
@ -272,7 +279,7 @@ func (c *BasicClient) Batch(ps <-chan Point, r chan<- response) error {
}
instanceURLs := make([]string, len(c.Addresses))
for i := 0; i < len(c.Addresses); i++ {
instanceURLs[i] = fmt.Sprintf("http://%v/write?db=%v&precision=%v", c.Addresses[i], c.Database, c.Precision)
instanceURLs[i] = fmt.Sprintf("http://%v/write?db=%v&rp=%v&precision=%v", c.Addresses[i], c.Database, c.RetentionPolicy, c.Precision)
}
c.Addresses = instanceURLs
@ -290,6 +297,17 @@ func (c *BasicClient) Batch(ps <-chan Point, r chan<- response) error {
ctr := 0
writeBatch := func(b []byte) {
wg.Add(1)
counter.Increment()
go func(byt []byte) {
c.retry(byt, time.Duration(1))
counter.Decrement()
wg.Done()
}(b)
}
for p := range ps {
b := p.Line()
c.addrId = ctr % len(c.Addresses)
@ -300,22 +318,22 @@ func (c *BasicClient) Batch(ps <-chan Point, r chan<- response) error {
if ctr%c.BatchSize == 0 && ctr != 0 {
b := buf.Bytes()
if len(b) == 0 {
continue
}
// Trimming the trailing newline character
b = b[0 : len(b)-1]
wg.Add(1)
counter.Increment()
go func(byt []byte) {
c.retry(byt, time.Duration(1))
counter.Decrement()
wg.Done()
}(b)
writeBatch(b)
var temp bytes.Buffer
buf = temp
}
}
// Write out any remaining points
b := buf.Bytes()
if len(b) > 0 {
writeBatch(b)
}
wg.Wait()
@ -656,6 +674,7 @@ func (o *outputConfig) HTTPHandler(method string) func(r <-chan response, rt *Ti
})
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: o.database,
RetentionPolicy: o.retentionPolicy,
Precision: "ns",
})
for p := range r {
@ -673,6 +692,7 @@ func (o *outputConfig) HTTPHandler(method string) func(r <-chan response, rt *Ti
o.mu.Lock()
bp, _ = client.NewBatchPoints(client.BatchPointsConfig{
Database: o.database,
RetentionPolicy: o.retentionPolicy,
Precision: "ns",
})
o.mu.Unlock()

View File

@ -3,9 +3,10 @@ package stress
import (
"flag"
"fmt"
"github.com/BurntSushi/toml"
"strings"
"sync"
"github.com/BurntSushi/toml"
)
// Config is a struct for the Stress test configuration
@ -103,12 +104,14 @@ type outputConfig struct {
tags map[string]string
addr string
database string
retentionPolicy string
mu sync.Mutex
}
func (t *outputConfig) SetParams(addr, db string) {
func (t *outputConfig) SetParams(addr, db, rp string) {
t.addr = addr
t.database = db
t.retentionPolicy = rp
}
func NewOutputConfig() *outputConfig {
@ -116,11 +119,12 @@ func NewOutputConfig() *outputConfig {
tags := make(map[string]string)
o.tags = tags
database := flag.String("database", "stress", "name of database where the response times will persist")
retentionPolicy := flag.String("retention-policy", "", "name of the retention policy where the response times will persist")
address := flag.String("addr", "http://localhost:8086", "IP address and port of database where response times will persist (e.g., localhost:8086)")
flag.Var(&o, "tags", "A comma seperated list of tags")
flag.Parse()
o.SetParams(*address, *database)
o.SetParams(*address, *database, *retentionPolicy)
return &o
@ -131,7 +135,7 @@ func (t *outputConfig) String() string {
for k, v := range t.tags {
s += fmt.Sprintf("%v=%v ", k, v)
}
return fmt.Sprintf("%v %v %v", s, t.database, t.addr)
return fmt.Sprintf("%v %v %v %v", s, t.database, t.retentionPolicy, t.addr)
}
func (t *outputConfig) Set(value string) error {