Merge pull request #4961 from influxdb/md-stress-cluster

Add cluster support to stress test
pull/5013/head
Michael Desa 2015-12-04 15:20:31 -08:00
commit 14b5ed3001
5 changed files with 127 additions and 49 deletions

View File

@ -29,7 +29,7 @@
# Randomize timestamp a bit (not functional)
jitter = true
# Precision of points that are being written
precision = "n"
precision = "s"
# name of the measurement that will be written
measurement = "cpu"
# The date for the first point that is written into influx
@ -51,12 +51,12 @@
[write.influx_client.basic]
# If enabled the writer will actually write
enabled = true
# Address of the Influxdb instance
address = "localhost:8086" # stress_test_server runs on port 1234
# Addresses is an array of the Influxdb instances
addresses = ["localhost:8086"] # stress_test_server runs on port 1234
# Database that is being written to
database = "stress"
# Precision of points that are being written
precision = "n"
precision = "s"
# Size of batches that are sent to db
batch_size = 10000
# Interval between each batch
@ -82,7 +82,7 @@
# if enabled the reader will actually read
enabled = true
# Address of the instance that will be queried
address = "localhost:8086"
addresses = ["localhost:8086"]
# Database that will be queried
database = "stress"
# Interval bewteen queries

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"sync"
"time"
@ -75,6 +76,7 @@ type BasicPointGenerator struct {
Tags AbstractTags `toml:"tag"`
Fields AbstractFields `toml:"field"`
StartDate string `toml:"start_date"`
Precision string `toml:"precision"`
time time.Time
mu sync.Mutex
}
@ -104,6 +106,18 @@ func typeArr(a []string) []interface{} {
return i
}
func (b *BasicPointGenerator) timestamp(t time.Time) int64 {
var n int64
if b.Precision == "s" {
n = t.Unix()
} else {
n = t.UnixNano()
}
return n
}
// Template returns a function that returns a pointer to a Pnt.
func (b *BasicPointGenerator) Template() func(i int, t time.Time) *Pnt {
ts := b.Tags.Template()
@ -114,7 +128,7 @@ func (b *BasicPointGenerator) Template() func(i int, t time.Time) *Pnt {
p := &Pnt{}
arr := []interface{}{i}
arr = append(arr, typeArr(fa)...)
arr = append(arr, t.UnixNano())
arr = append(arr, b.timestamp(t))
str := fmt.Sprintf(tmplt, arr...)
p.Set([]byte(str))
@ -222,15 +236,16 @@ func (b *BasicPointGenerator) Time() time.Time {
// BasicClient implements the InfluxClient
// interface.
type BasicClient struct {
Enabled bool `toml:"enabled"`
Address string `toml:"address"`
Database string `toml:"database"`
Precision string `toml:"precision"`
BatchSize int `toml:"batch_size"`
BatchInterval string `toml:"batch_interval"`
Concurrency int `toml:"concurrency"`
SSL bool `toml:"ssl"`
Format string `toml:"format"`
Enabled bool `toml:"enabled"`
Addresses []string `toml:"addresses"`
Database string `toml:"database"`
Precision string `toml:"precision"`
BatchSize int `toml:"batch_size"`
BatchInterval string `toml:"batch_interval"`
Concurrency int `toml:"concurrency"`
SSL bool `toml:"ssl"`
Format string `toml:"format"`
addrId int
}
// Batch groups together points
@ -238,6 +253,12 @@ func (c *BasicClient) Batch(ps <-chan Point, r chan<- response) error {
if !c.Enabled {
return nil
}
instanceURLs := make([]string, len(c.Addresses))
for i := 0; i < len(c.Addresses); i++ {
instanceURLs[i] = fmt.Sprintf("http://%v/write?db=%v&precision=%v", c.Addresses[i], c.Database, c.Precision)
}
c.Addresses = instanceURLs
var buf bytes.Buffer
var wg sync.WaitGroup
@ -252,6 +273,7 @@ func (c *BasicClient) Batch(ps <-chan Point, r chan<- response) error {
for p := range ps {
b := p.Line()
c.addrId = ctr % len(c.Addresses)
ctr++
buf.Write(b)
@ -311,10 +333,9 @@ func post(url string, datatype string, data io.Reader) (*http.Response, error) {
// Send calls post and returns a response
func (c *BasicClient) send(b []byte) (response, error) {
instanceURL := fmt.Sprintf("http://%v/write?db=%v&precision=%v", c.Address, c.Database, c.Precision)
t := NewTimer()
resp, err := post(instanceURL, "application/x-www-form-urlencoded", bytes.NewBuffer(b))
resp, err := post(c.Addresses[c.addrId], "application/x-www-form-urlencoded", bytes.NewBuffer(b))
t.StopTimer()
if err != nil {
return response{Timer: t}, err
@ -360,26 +381,30 @@ func (q *BasicQuery) SetTime(t time.Time) {
// BasicQueryClient implements the QueryClient interface
type BasicQueryClient struct {
Enabled bool `toml:"enabled"`
Address string `toml:"address"`
Database string `toml:"database"`
QueryInterval string `toml:"query_interval"`
Concurrency int `toml:"concurrency"`
client client.Client
Enabled bool `toml:"enabled"`
Addresses []string `toml:"addresses"`
Database string `toml:"database"`
QueryInterval string `toml:"query_interval"`
Concurrency int `toml:"concurrency"`
clients []client.Client
addrId int
}
// Init initializes the InfluxDB client
func (b *BasicQueryClient) Init() error {
cl, err := client.NewHTTPClient(client.HTTPConfig{
Addr: fmt.Sprintf("http://%v", b.Address),
})
if err != nil {
return err
for _, a := range b.Addresses {
cl, err := client.NewHTTPClient(client.HTTPConfig{
Addr: fmt.Sprintf("http://%v", a),
})
if err != nil {
return err
}
b.clients = append(b.clients, cl)
}
b.client = cl
return nil
}
@ -391,7 +416,7 @@ func (b *BasicQueryClient) Query(cmd Query) (response, error) {
}
t := NewTimer()
_, err := b.client.Query(q)
_, err := b.clients[b.addrId].Query(q)
t.StopTimer()
if err != nil {
@ -423,7 +448,12 @@ func (b *BasicQueryClient) Exec(qs <-chan Query, r chan<- response) error {
return err
}
ctr := 0
for q := range qs {
b.addrId = ctr % len(b.Addresses)
ctr++
wg.Add(1)
counter.Increment()
func(q Query) {
@ -524,6 +554,52 @@ func BasicWriteHandler(rs <-chan response, wt *Timer) {
fmt.Printf("Points Per Second: %v\n\n", float64(n)*float64(10000)/float64(wt.Elapsed().Seconds()))
}
func (b *BasicClient) HTTPWriteHandler(rs <-chan response, wt *Timer) {
n := 0
success := 0
fail := 0
s := time.Duration(0)
for t := range rs {
// Send off data to influx coordination server
n++
if t.Success() {
success++
} else {
fail++
}
s += t.Timer.Elapsed()
}
if n == 0 {
return
}
pps := float64(n) * float64(b.BatchSize) / float64(wt.Elapsed().Seconds())
vals := url.Values{
"PerfConfig": {"some config file"},
"InfluxConfig": {"some config file"},
"TestId": {"1"},
"Name": {"some name"},
"BatchSize": {fmt.Sprintf("%v", int(b.BatchSize))},
"BatchInterval": {fmt.Sprintf("%v", b.BatchInterval)},
"Concurrency": {fmt.Sprintf("%v", int(b.Concurrency))},
"PointsPerSecond": {fmt.Sprintf("%v", int(pps))},
"FailRequests": {fmt.Sprintf("%v", int(fail))},
"SuccessRequests": {fmt.Sprintf("%v", int(success))},
}
http.PostForm(fmt.Sprintf("http://%s/results", "localhost:8080"), vals)
}
// BasicReadHandler handles read responses.
func BasicReadHandler(r <-chan response, rt *Timer) {
n := 0

View File

@ -29,7 +29,7 @@
[write.influx_client]
[write.influx_client.basic]
enabled = true
address = "localhost:8086" # stress_test_server runs on port 1234
addresses = ["localhost:8086","localhost:1234","localhost:5678"] # stress_test_server runs on port 1234
database = "stress"
precision = "n"
batch_size = 10000
@ -46,7 +46,8 @@
[read.query_client]
[read.query_client.basic]
address = "localhost:8086"
enabled = true
addresses = ["localhost:8086"]
database = "stress"
query_interval = "100ms"
concurrency = 1

View File

@ -287,7 +287,7 @@ func Test_post(t *testing.T) {
}
var basicIC = &BasicClient{
Address: "localhost:8086",
Addresses: []string{"localhost:8086"},
Database: "stress",
Precision: "n",
BatchSize: 1000,
@ -307,7 +307,7 @@ func TestBasicClient_send(t *testing.T) {
}))
defer ts.Close()
basicIC.Address = ts.URL[7:]
basicIC.Addresses[0] = ts.URL
b := []byte(
`cpu,host=server-1,location=us-west value=100 12932
cpu,host=server-2,location=us-west value=10 12932
@ -334,7 +334,7 @@ func TestBasicClient_Batch(t *testing.T) {
}))
defer ts.Close()
basicIC.Address = ts.URL[7:]
basicIC.Addresses[0] = ts.URL[7:]
go func(c chan Point) {
defer close(c)
@ -379,7 +379,7 @@ func TestBasicQuery_QueryGenerate(t *testing.T) {
}
var basicQC = &BasicQueryClient{
Address: "localhost:8086",
Addresses: []string{"localhost:8086"},
Database: "stress",
QueryInterval: "10s",
Concurrency: 1,
@ -396,7 +396,7 @@ func TestBasicQueryClient_Query(t *testing.T) {
}))
defer ts.Close()
basicQC.Address = ts.URL[7:]
basicQC.Addresses[0] = ts.URL[7:]
basicQC.Init()
q := "SELECT count(value) FROM cpu"
@ -458,8 +458,8 @@ func Test_NewConfigWithFile(t *testing.T) {
// TODO: Check fields
wc := w.InfluxClients.Basic
if wc.Address != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", wc.Address)
if wc.Addresses[0] != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", wc.Addresses[0])
}
if wc.Database != "stress" {
t.Errorf("Expected stress got %s", wc.Database)
@ -492,8 +492,8 @@ func Test_NewConfigWithFile(t *testing.T) {
}
qc := r.QueryClients.Basic
if qc.Address != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Address)
if qc.Addresses[0] != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Addresses[0])
}
if qc.Database != "stress" {
t.Errorf("Expected stress got %s", qc.Database)
@ -545,8 +545,8 @@ func Test_NewConfigWithoutFile(t *testing.T) {
// TODO: Check fields
wc := w.InfluxClients.Basic
if wc.Address != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", wc.Address)
if wc.Addresses[0] != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", wc.Addresses[0])
}
if wc.Database != "stress" {
t.Errorf("Expected stress got %s", wc.Database)
@ -579,8 +579,8 @@ func Test_NewConfigWithoutFile(t *testing.T) {
}
qc := r.QueryClients.Basic
if qc.Address != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Address)
if qc.Addresses[0] != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Addresses[0])
}
if qc.Database != "stress" {
t.Errorf("Expected stress got %s", qc.Database)

View File

@ -18,6 +18,7 @@ var s = `
jitter = true
measurement = "cpu"
start_date = "2006-Jan-02"
precision = "n"
[[write.point_generator.basic.tag]]
key = "host"
value = "server"
@ -32,8 +33,7 @@ var s = `
[write.influx_client]
[write.influx_client.basic]
enabled = true
#address = "localhost:1234"
address = "localhost:8086"
addresses = ["localhost:8086"]
database = "stress"
precision = "n"
batch_size = 5000
@ -50,7 +50,8 @@ var s = `
[read.query_client]
[read.query_client.basic]
address = "localhost:8086"
enabled = true
addresses = ["localhost:8086"]
database = "stress"
query_interval = "100ms"
concurrency = 1