diff --git a/cmd/influx_stress/influx_stress.go b/cmd/influx_stress/influx_stress.go index 6eaee05b76..df5a874843 100644 --- a/cmd/influx_stress/influx_stress.go +++ b/cmd/influx_stress/influx_stress.go @@ -11,7 +11,11 @@ import ( var ( //database = flag.String("database", "", "name of database") - //address = flag.String("addr", "", "IP address and port of database (e.g., localhost:8086)") + address = flag.String("addr", "", "IP address and port of database where response times will persist (e.g., localhost:8086)") + tags = flag.String("tags", "", "") + + //id = flag.String("id", "", "ID for the test that is being ran") + //name = flag.String("name", "", "name of the test that is being ran") config = flag.String("config", "", "The stress test file") cpuprofile = flag.String("cpuprofile", "", "Write the cpu profile to `filename`") @@ -37,7 +41,17 @@ func main() { return } - stress.Run(c) + w := stress.NewWriter(&c.Write.PointGenerators.Basic, &c.Write.InfluxClients.Basic) + r := stress.NewQuerier(&c.Read.QueryGenerators.Basic, &c.Read.QueryClients.Basic) + s := stress.NewStressTest(&c.Provision.Basic, w, r) + + bw := stress.NewBroadcastChannel() + bw.Register(stress.BasicWriteHandler) + + br := stress.NewBroadcastChannel() + br.Register(stress.BasicReadHandler) + + s.Start(bw.Handle, br.Handle) return diff --git a/stress/basic.go b/stress/basic.go index 7a4e678bda..83b657fc78 100644 --- a/stress/basic.go +++ b/stress/basic.go @@ -7,7 +7,7 @@ import ( "io/ioutil" "math/rand" "net/http" - "net/url" + //"net/url" "sync" "time" @@ -521,6 +521,72 @@ func (b *BasicProvisioner) Provision() error { return nil } +type BroadcastChannel struct { + chs []chan response + wg sync.WaitGroup + fns []func(t *Timer) +} + +func NewBroadcastChannel() *BroadcastChannel { + chs := make([]chan response, 0) + + var wg sync.WaitGroup + + b := &BroadcastChannel{ + chs: chs, + wg: wg, + } + + return b +} + +func (b *BroadcastChannel) Register(fn responseHandler) { + ch := make(chan response, 0) + + b.chs = append(b.chs, ch) + + f := func(t *Timer) { + go fn(ch, t) + } + + b.fns = append(b.fns, f) +} + +func (b *BroadcastChannel) Broadcast(r response) { + + b.wg.Add(1) + for _, ch := range b.chs { + b.wg.Add(1) + go func(ch chan response) { + ch <- r + b.wg.Done() + }(ch) + } + b.wg.Done() +} + +func (b *BroadcastChannel) Close() { + b.wg.Wait() + for _, ch := range b.chs { + close(ch) + // Workaround + time.Sleep(1 * time.Second) + } +} + +func (b *BroadcastChannel) Handle(rs <-chan response, t *Timer) { + + // Start all of the handlers + for _, fn := range b.fns { + fn(t) + } + + for i := range rs { + b.Broadcast(i) + } + b.Close() +} + // BasicWriteHandler handles write responses. func BasicWriteHandler(rs <-chan response, wt *Timer) { n := 0 @@ -554,52 +620,6 @@ func BasicWriteHandler(rs <-chan response, wt *Timer) { fmt.Printf("Points Per Second: %v\n\n", float64(n)*float64(10000)/float64(wt.Elapsed().Seconds())) } -func (b *BasicClient) HTTPWriteHandler(rs <-chan response, wt *Timer) { - n := 0 - success := 0 - fail := 0 - - s := time.Duration(0) - - for t := range rs { - - // Send off data to influx coordination server - - n++ - - if t.Success() { - success++ - } else { - fail++ - } - - s += t.Timer.Elapsed() - - } - - if n == 0 { - return - } - - pps := float64(n) * float64(b.BatchSize) / float64(wt.Elapsed().Seconds()) - - vals := url.Values{ - "PerfConfig": {"some config file"}, - "InfluxConfig": {"some config file"}, - "TestId": {"1"}, - "Name": {"some name"}, - "BatchSize": {fmt.Sprintf("%v", int(b.BatchSize))}, - "BatchInterval": {fmt.Sprintf("%v", b.BatchInterval)}, - "Concurrency": {fmt.Sprintf("%v", int(b.Concurrency))}, - "PointsPerSecond": {fmt.Sprintf("%v", int(pps))}, - "FailRequests": {fmt.Sprintf("%v", int(fail))}, - "SuccessRequests": {fmt.Sprintf("%v", int(success))}, - } - - http.PostForm(fmt.Sprintf("http://%s/results", "localhost:8080"), vals) - -} - // BasicReadHandler handles read responses. func BasicReadHandler(r <-chan response, rt *Timer) { n := 0 diff --git a/stress/run.go b/stress/run.go index c0c877ae8d..4aaed4ddc9 100644 --- a/stress/run.go +++ b/stress/run.go @@ -8,15 +8,6 @@ import ( "time" ) -// Run handles the logic for running a stress test given a config file -func Run(c *Config) { - w := NewWriter(&c.Write.PointGenerators.Basic, &c.Write.InfluxClients.Basic) - r := NewQuerier(&c.Read.QueryGenerators.Basic, &c.Read.QueryClients.Basic) - s := NewStressTest(&c.Provision.Basic, w, r) - - s.Start(BasicWriteHandler, BasicReadHandler) -} - // Point is an interface that is used to represent // the abstract idea of a point in InfluxDB. type Point interface {