Add broadcast channel for stress

pull/5062/head^2
Michael Desa 2015-12-08 11:10:44 -08:00
parent bcc1ad0d3f
commit c7709643b1
3 changed files with 83 additions and 58 deletions

View File

@ -11,7 +11,11 @@ import (
var (
//database = flag.String("database", "", "name of database")
//address = flag.String("addr", "", "IP address and port of database (e.g., localhost:8086)")
address = flag.String("addr", "", "IP address and port of database where response times will persist (e.g., localhost:8086)")
tags = flag.String("tags", "", "")
//id = flag.String("id", "", "ID for the test that is being ran")
//name = flag.String("name", "", "name of the test that is being ran")
config = flag.String("config", "", "The stress test file")
cpuprofile = flag.String("cpuprofile", "", "Write the cpu profile to `filename`")
@ -37,7 +41,17 @@ func main() {
return
}
stress.Run(c)
w := stress.NewWriter(&c.Write.PointGenerators.Basic, &c.Write.InfluxClients.Basic)
r := stress.NewQuerier(&c.Read.QueryGenerators.Basic, &c.Read.QueryClients.Basic)
s := stress.NewStressTest(&c.Provision.Basic, w, r)
bw := stress.NewBroadcastChannel()
bw.Register(stress.BasicWriteHandler)
br := stress.NewBroadcastChannel()
br.Register(stress.BasicReadHandler)
s.Start(bw.Handle, br.Handle)
return

View File

@ -7,7 +7,7 @@ import (
"io/ioutil"
"math/rand"
"net/http"
"net/url"
//"net/url"
"sync"
"time"
@ -521,6 +521,72 @@ func (b *BasicProvisioner) Provision() error {
return nil
}
type BroadcastChannel struct {
chs []chan response
wg sync.WaitGroup
fns []func(t *Timer)
}
func NewBroadcastChannel() *BroadcastChannel {
chs := make([]chan response, 0)
var wg sync.WaitGroup
b := &BroadcastChannel{
chs: chs,
wg: wg,
}
return b
}
func (b *BroadcastChannel) Register(fn responseHandler) {
ch := make(chan response, 0)
b.chs = append(b.chs, ch)
f := func(t *Timer) {
go fn(ch, t)
}
b.fns = append(b.fns, f)
}
func (b *BroadcastChannel) Broadcast(r response) {
b.wg.Add(1)
for _, ch := range b.chs {
b.wg.Add(1)
go func(ch chan response) {
ch <- r
b.wg.Done()
}(ch)
}
b.wg.Done()
}
func (b *BroadcastChannel) Close() {
b.wg.Wait()
for _, ch := range b.chs {
close(ch)
// Workaround
time.Sleep(1 * time.Second)
}
}
func (b *BroadcastChannel) Handle(rs <-chan response, t *Timer) {
// Start all of the handlers
for _, fn := range b.fns {
fn(t)
}
for i := range rs {
b.Broadcast(i)
}
b.Close()
}
// BasicWriteHandler handles write responses.
func BasicWriteHandler(rs <-chan response, wt *Timer) {
n := 0
@ -554,52 +620,6 @@ func BasicWriteHandler(rs <-chan response, wt *Timer) {
fmt.Printf("Points Per Second: %v\n\n", float64(n)*float64(10000)/float64(wt.Elapsed().Seconds()))
}
func (b *BasicClient) HTTPWriteHandler(rs <-chan response, wt *Timer) {
n := 0
success := 0
fail := 0
s := time.Duration(0)
for t := range rs {
// Send off data to influx coordination server
n++
if t.Success() {
success++
} else {
fail++
}
s += t.Timer.Elapsed()
}
if n == 0 {
return
}
pps := float64(n) * float64(b.BatchSize) / float64(wt.Elapsed().Seconds())
vals := url.Values{
"PerfConfig": {"some config file"},
"InfluxConfig": {"some config file"},
"TestId": {"1"},
"Name": {"some name"},
"BatchSize": {fmt.Sprintf("%v", int(b.BatchSize))},
"BatchInterval": {fmt.Sprintf("%v", b.BatchInterval)},
"Concurrency": {fmt.Sprintf("%v", int(b.Concurrency))},
"PointsPerSecond": {fmt.Sprintf("%v", int(pps))},
"FailRequests": {fmt.Sprintf("%v", int(fail))},
"SuccessRequests": {fmt.Sprintf("%v", int(success))},
}
http.PostForm(fmt.Sprintf("http://%s/results", "localhost:8080"), vals)
}
// BasicReadHandler handles read responses.
func BasicReadHandler(r <-chan response, rt *Timer) {
n := 0

View File

@ -8,15 +8,6 @@ import (
"time"
)
// Run handles the logic for running a stress test given a config file
func Run(c *Config) {
w := NewWriter(&c.Write.PointGenerators.Basic, &c.Write.InfluxClients.Basic)
r := NewQuerier(&c.Read.QueryGenerators.Basic, &c.Read.QueryClients.Basic)
s := NewStressTest(&c.Provision.Basic, w, r)
s.Start(BasicWriteHandler, BasicReadHandler)
}
// Point is an interface that is used to represent
// the abstract idea of a point in InfluxDB.
type Point interface {