Merge commit 'b1be2d5' into md-stress-http
commit
35f45841ce
|
@ -588,7 +588,7 @@ func (b *BroadcastChannel) Handle(rs <-chan response, t *Timer) {
|
|||
}
|
||||
|
||||
// BasicWriteHandler handles write responses.
|
||||
func BasicWriteHandler(rs <-chan response, wt *Timer) {
|
||||
func (b *BasicClient) BasicWriteHandler(rs <-chan response, wt *Timer) {
|
||||
n := 0
|
||||
success := 0
|
||||
fail := 0
|
||||
|
@ -617,11 +617,11 @@ func BasicWriteHandler(rs <-chan response, wt *Timer) {
|
|||
fmt.Printf(" Success: %v\n", success)
|
||||
fmt.Printf(" Fail: %v\n", fail)
|
||||
fmt.Printf("Average Response Time: %v\n", s/time.Duration(n))
|
||||
fmt.Printf("Points Per Second: %v\n\n", float64(n)*float64(10000)/float64(wt.Elapsed().Seconds()))
|
||||
fmt.Printf("Points Per Second: %v\n\n", float64(n)*float64(b.BatchSize)/float64(wt.Elapsed().Seconds()))
|
||||
}
|
||||
|
||||
// BasicReadHandler handles read responses.
|
||||
func BasicReadHandler(r <-chan response, rt *Timer) {
|
||||
func (b *BasicQueryClient) BasicReadHandler(r <-chan response, rt *Timer) {
|
||||
n := 0
|
||||
s := time.Duration(0)
|
||||
for t := range r {
|
||||
|
@ -637,16 +637,18 @@ func BasicReadHandler(r <-chan response, rt *Timer) {
|
|||
fmt.Printf("Average Query Response Time: %v\n\n", s/time.Duration(n))
|
||||
}
|
||||
|
||||
func WriteHTTPHandler(r <-chan response, rt *Timer) {
|
||||
func (o *outputConfig) HTTPHandler(method string) func(r <-chan response, rt *Timer) {
|
||||
return func(r <-chan response, rt *Timer) {
|
||||
c, _ := client.NewHTTPClient(client.HTTPConfig{
|
||||
Addr: "http://localhost:8086",
|
||||
Addr: o.addr,
|
||||
})
|
||||
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
|
||||
Database: "stress",
|
||||
Database: o.database,
|
||||
Precision: "ns",
|
||||
})
|
||||
for p := range r {
|
||||
tags := map[string]string{"test": "foo"}
|
||||
tags := o.tags
|
||||
tags["method"] = method
|
||||
fields := map[string]interface{}{
|
||||
"response_time": p.Timer.Elapsed(),
|
||||
}
|
||||
|
@ -655,9 +657,14 @@ func WriteHTTPHandler(r <-chan response, rt *Timer) {
|
|||
if len(bp.Points())%1000 == 0 && len(bp.Points()) != 0 {
|
||||
c.Write(bp)
|
||||
bp, _ = client.NewBatchPoints(client.BatchPointsConfig{
|
||||
Database: "stress",
|
||||
Database: o.database,
|
||||
Precision: "ns",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(bp.Points()) != 0 {
|
||||
c.Write(bp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package stress
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/BurntSushi/toml"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Config is a struct for the Stress test configuration
|
||||
|
@ -94,3 +97,45 @@ func DecodeConfig(s string) (*Config, error) {
|
|||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
type outputConfig struct {
|
||||
tags map[string]string
|
||||
addr string
|
||||
database string
|
||||
}
|
||||
|
||||
func (t *outputConfig) SetParams(addr, db string) {
|
||||
t.addr = addr
|
||||
t.database = db
|
||||
}
|
||||
|
||||
func NewOutputConfig() *outputConfig {
|
||||
var o outputConfig
|
||||
tags := make(map[string]string)
|
||||
o.tags = tags
|
||||
database := flag.String("database", "stress", "name of database")
|
||||
address := flag.String("addr", "http://localhost:8086", "IP address and port of database where response times will persist (e.g., localhost:8086)")
|
||||
flag.Var(&o, "tags", "A comma seperated list of tags")
|
||||
flag.Parse()
|
||||
|
||||
o.SetParams(*address, *database)
|
||||
|
||||
return &o
|
||||
|
||||
}
|
||||
|
||||
func (t *outputConfig) String() string {
|
||||
var s string
|
||||
for k, v := range t.tags {
|
||||
s += fmt.Sprintf("%v=%v ", k, v)
|
||||
}
|
||||
return fmt.Sprintf("%v %v %v", s, t.database, t.addr)
|
||||
}
|
||||
|
||||
func (t *outputConfig) Set(value string) error {
|
||||
for _, s := range strings.Split(value, ",") {
|
||||
tags := strings.Split(s, "=")
|
||||
t.tags[tags[0]] = tags[1]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue