Add querying across cluster

Add support for precision
pull/4961/head
Michael Desa 2015-12-02 10:29:55 -08:00
parent b0f27beb12
commit f9fa366861
5 changed files with 58 additions and 35 deletions

View File

@ -29,7 +29,7 @@
# Randomize timestamp a bit (not functional)
jitter = true
# Precision of points that are being written
precision = "n"
precision = "s"
# name of the measurement that will be written
measurement = "cpu"
# The date for the first point that is written into influx
@ -56,7 +56,7 @@
# Database that is being written to
database = "stress"
# Precision of points that are being written
precision = "n"
precision = "s"
# Size of batches that are sent to db
batch_size = 10000
# Interval between each batch
@ -82,7 +82,7 @@
# if enabled the reader will actually read
enabled = true
# Address of the instance that will be queried
address = "localhost:8086"
addresses = ["localhost:8086"]
# Database that will be queried
database = "stress"
# Interval bewteen queries

View File

@ -76,6 +76,7 @@ type BasicPointGenerator struct {
Tags AbstractTags `toml:"tag"`
Fields AbstractFields `toml:"field"`
StartDate string `toml:"start_date"`
Precision string `toml:"precision"`
time time.Time
mu sync.Mutex
}
@ -105,6 +106,18 @@ func typeArr(a []string) []interface{} {
return i
}
func (b *BasicPointGenerator) timestamp(t time.Time) int64 {
var n int64
if b.Precision == "s" {
n = t.Unix()
} else {
n = t.UnixNano()
}
return n
}
// Template returns a function that returns a pointer to a Pnt.
func (b *BasicPointGenerator) Template() func(i int, t time.Time) *Pnt {
ts := b.Tags.Template()
@ -115,7 +128,7 @@ func (b *BasicPointGenerator) Template() func(i int, t time.Time) *Pnt {
p := &Pnt{}
arr := []interface{}{i}
arr = append(arr, typeArr(fa)...)
arr = append(arr, t.UnixNano())
arr = append(arr, b.timestamp(t))
str := fmt.Sprintf(tmplt, arr...)
p.Set([]byte(str))
@ -368,26 +381,30 @@ func (q *BasicQuery) SetTime(t time.Time) {
// BasicQueryClient implements the QueryClient interface
type BasicQueryClient struct {
Enabled bool `toml:"enabled"`
Address string `toml:"address"`
Database string `toml:"database"`
QueryInterval string `toml:"query_interval"`
Concurrency int `toml:"concurrency"`
client client.Client
Enabled bool `toml:"enabled"`
Addresses []string `toml:"addresses"`
Database string `toml:"database"`
QueryInterval string `toml:"query_interval"`
Concurrency int `toml:"concurrency"`
clients []client.Client
addrId int
}
// Init initializes the InfluxDB client
func (b *BasicQueryClient) Init() error {
cl, err := client.NewHTTPClient(client.HTTPConfig{
Addr: fmt.Sprintf("http://%v", b.Address),
})
if err != nil {
return err
for _, a := range b.Addresses {
cl, err := client.NewHTTPClient(client.HTTPConfig{
Addr: fmt.Sprintf("http://%v", a),
})
if err != nil {
return err
}
b.clients = append(b.clients, cl)
}
b.client = cl
return nil
}
@ -399,7 +416,7 @@ func (b *BasicQueryClient) Query(cmd Query) (response, error) {
}
t := NewTimer()
_, err := b.client.Query(q)
_, err := b.clients[b.addrId].Query(q)
t.StopTimer()
if err != nil {
@ -431,7 +448,12 @@ func (b *BasicQueryClient) Exec(qs <-chan Query, r chan<- response) error {
return err
}
ctr := 0
for q := range qs {
b.addrId = ctr % len(b.Addresses)
ctr++
wg.Add(1)
counter.Increment()
func(q Query) {
@ -562,13 +584,11 @@ func (b *BasicClient) HTTPWriteHandler(rs <-chan response, wt *Timer) {
pps := float64(n) * float64(b.BatchSize) / float64(wt.Elapsed().Seconds())
vals := url.Values{
"PerfConfig": {"some config file"},
"InfluxConfig": {"some config file"},
"TestId": {"1"},
"Name": {"some name"},
//"PointCount": {fmt.Sprintf("%v", int(b.PointCount))},
"BatchSize": {fmt.Sprintf("%v", int(b.BatchSize))},
//"SeriesCount": {fmt.Sprintf("%v", int(b.SeriesCount))},
"PerfConfig": {"some config file"},
"InfluxConfig": {"some config file"},
"TestId": {"1"},
"Name": {"some name"},
"BatchSize": {fmt.Sprintf("%v", int(b.BatchSize))},
"BatchInterval": {fmt.Sprintf("%v", b.BatchInterval)},
"Concurrency": {fmt.Sprintf("%v", int(b.Concurrency))},
"PointsPerSecond": {fmt.Sprintf("%v", int(pps))},
@ -576,7 +596,7 @@ func (b *BasicClient) HTTPWriteHandler(rs <-chan response, wt *Timer) {
"SuccessRequests": {fmt.Sprintf("%v", int(success))},
}
http.PostForm(fmt.Sprintf("http://%s/results", post), vals)
http.PostForm(fmt.Sprintf("http://%s/results", "localhost:8080"), vals)
}

View File

@ -46,7 +46,8 @@
[read.query_client]
[read.query_client.basic]
address = "localhost:8086"
enabled = true
addresses = ["localhost:8086"]
database = "stress"
query_interval = "100ms"
concurrency = 1

View File

@ -307,7 +307,7 @@ func TestBasicClient_send(t *testing.T) {
}))
defer ts.Close()
basicIC.Addresses[0] = ts.URL[7:]
basicIC.Addresses[0] = ts.URL
b := []byte(
`cpu,host=server-1,location=us-west value=100 12932
cpu,host=server-2,location=us-west value=10 12932
@ -379,7 +379,7 @@ func TestBasicQuery_QueryGenerate(t *testing.T) {
}
var basicQC = &BasicQueryClient{
Address: "localhost:8086",
Addresses: []string{"localhost:8086"},
Database: "stress",
QueryInterval: "10s",
Concurrency: 1,
@ -396,7 +396,7 @@ func TestBasicQueryClient_Query(t *testing.T) {
}))
defer ts.Close()
basicQC.Address = ts.URL[7:]
basicQC.Addresses[0] = ts.URL[7:]
basicQC.Init()
q := "SELECT count(value) FROM cpu"
@ -492,8 +492,8 @@ func Test_NewConfigWithFile(t *testing.T) {
}
qc := r.QueryClients.Basic
if qc.Address != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Address)
if qc.Addresses[0] != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Addresses[0])
}
if qc.Database != "stress" {
t.Errorf("Expected stress got %s", qc.Database)
@ -579,8 +579,8 @@ func Test_NewConfigWithoutFile(t *testing.T) {
}
qc := r.QueryClients.Basic
if qc.Address != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Address)
if qc.Addresses[0] != "localhost:8086" {
t.Errorf("Expected `localhost:8086` got %s", qc.Addresses[0])
}
if qc.Database != "stress" {
t.Errorf("Expected stress got %s", qc.Database)

View File

@ -18,6 +18,7 @@ var s = `
jitter = true
measurement = "cpu"
start_date = "2006-Jan-02"
precision = "n"
[[write.point_generator.basic.tag]]
key = "host"
value = "server"
@ -49,7 +50,8 @@ var s = `
[read.query_client]
[read.query_client.basic]
address = "localhost:8086"
enabled = true
addresses = ["localhost:8086"]
database = "stress"
query_interval = "100ms"
concurrency = 1