Clean up code, fix struct and file names to fit with golang convention

pull/6855/head
Jack Zampolin 2016-06-15 16:00:06 -07:00
parent e6cd4c7313
commit e39dcaf8f0
43 changed files with 439 additions and 502 deletions

View File

@ -72,6 +72,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6882](https://github.com/influxdata/influxdb/pull/6882): Remove a double lock in the tsm1 index writer.
- [#6883](https://github.com/influxdata/influxdb/pull/6883): Rename dumptsmdev to dumptsm in influx_inspect.
- [#6864](https://github.com/influxdata/influxdb/pull/6864): Allow a non-admin to call "use" for the influx cli.
- [#6855](https://github.com/influxdata/influxdb/pull/6855): Update `stress/v2` to work with clusters, ssl, and username/password auth. Code cleanup
## v0.13.0 [2016-05-12]

View File

@ -36,7 +36,7 @@ func main() {
if *config != "" {
v2.RunStress(*config)
} else {
v2.RunStress("stress/v2/file.iql")
v2.RunStress("stress/v2/iql/file.iql")
}
} else {

View File

@ -1,3 +0,0 @@
/iql
stress-tool-refactor
*.txt

View File

@ -17,9 +17,9 @@ The tool has the following components:
- `QUERY` - Runs a given query or generates sample queries given a companion `INSERT` statement
- `SET` - Changes the test parameters. Defaults are listed in the `README.md`
- `WAIT` - Required after a `GO` statement. Blocks till all proceeding statements finish.
* Clients - The statement, results and InfluxDB clients. This code lives in `v2/ponyExpress`
- `Storefront` - The `Statement` client. Also contains the results client.
- `ponyExpress` - A performant InfluxDB client. Makes `GET /query` and `POST /write` requests. Forwards the results to the results client.
* Clients - The statement, results and InfluxDB clients. This code lives in `v2/stress_client`
- `StressTest` - The `Statement` client. Also contains the results client.
- `stressClient` - A performant InfluxDB client. Makes `GET /query` and `POST /write` requests. Forwards the results to the results client.
![Influx Stress Design](./influx_stress_v2.png)
@ -28,8 +28,8 @@ The tool has the following components:
`Statement` is an interface defined in `v2/statement/statement.go`:
```go
type Statement interface {
Run(s *ponyExpress.StoreFront)
Report(s *ponyExpress.StoreFront) string
Run(s *stressClient.StressTest)
Report(s *stressClient.StressTest) string
SetID(s string)
}
```
@ -37,11 +37,11 @@ type Statement interface {
* `Report` retrieves and collates all recorded test data from the reporting InfluxDB instance.
* `SetID` gives the statement an ID. Used in the parser. Each `statementID` is an 8 character random string used for reporting.
### `Statement` -> `Storefront`
### `Statement` -> `StressTest`
`Statement`s send `Packages` (queries or writes to the target database) or `Directives` (for changing test state) through the `StoreFront` to the `ponyExpress` where they are processed.
`Statement`s send `Package`s (queries or writes to the target database) or `Directives` (for changing test state) through the `StressTest` to the `stressClient` where they are processed.
```go
// v2/ponyExpress/package.go
// v2/stress_client/package.go
// T is Query or Write
// StatementID is for reporting
@ -52,9 +52,9 @@ type Package struct {
Tracer *Tracer
}
// v2/ponyExpress/directive.go
// v2/stress_client/directive.go
// Property is test state to change
// Property is test state variable to change
// Value is the new value
type Directive struct {
Property string
@ -63,10 +63,10 @@ type Directive struct {
}
```
The `Tracer` on both of these packages contains a `sync.WaitGroup` that prevents `Statement`s from returning before all their operations are finished. This `WaitGroup` is incremented in the `Run()` of the statement and decremented in `*StoreFront.resultsListen()` after results are recorded in the database. This is well documented with inline comments. `Tracer`s also carry optional tags for reporting purposes.
The `Tracer` on both of these packages contains a `sync.WaitGroup` that prevents `Statement`s from returning before all their operations are finished. This `WaitGroup` is incremented in the `Run()` of the statement and decremented in `*StressTest.resultsListen()` after results are recorded in the database. This is well documented with inline comments. `Tracer`s also carry optional tags for reporting purposes.
```go
// v2/ponyExpress/tracer.go
// v2/stress_client/tracer.go
type Tracer struct {
Tags map[string]string
@ -74,12 +74,12 @@ type Tracer struct {
}
```
### `StoreFront`
### `StressTest`
The `StoreFront` is the client for the statements through the `*StoreFront.SendPackage()` and `*StoreFront.SendDirective()` functions. It also contains some test state and the `ResultsClient`.
The `StressTest` is the client for the statements through the `*StressTest.SendPackage()` and `*StressTest.SendDirective()` functions. It also contains some test state and the `ResultsClient`.
```go
type StoreFront struct {
type StressTest struct {
TestID string
TestName string
@ -101,19 +101,19 @@ type StoreFront struct {
### Reporting Client
The `ResultsClient` turns raw responses from InfluxDB into properly tagged points containing any relevant information for storage in another InfluxDB instance. The code for creating those points lives in `v2/ponyExpress/reporting.go`
The `ResultsClient` turns raw responses from InfluxDB into properly tagged points containing any relevant information for storage in another InfluxDB instance. The code for creating those points lives in `v2/stress_client/reporting.go`
### InfluxDB Instance (reporting)
This is `localhost:8086` by default. The results are currently stored in the `_DefaultTestName` database. This is going to be changed.
This is `localhost:8086` by default. The results are currently stored in the `_stressTest` database.
### `ponyExpress`
### `stressClient`
An InfluxDB client designed for speed. `ponyExpress` also holds most test state.
An InfluxDB client designed for speed. `stressClient` also holds most test state.
```go
// v2/ponyExpress/ponyExpress.go
type ponyExpress struct {
// v2/stress_client/stress_client.go
type stressClient struct {
testID string
// State for the Stress Test
@ -144,7 +144,7 @@ type ponyExpress struct {
rc *ConcurrencyLimiter
}
```
Code for handling the write path is in `v2/ponyExpress/ponyExpress_write.go` while the query path is in `v2/ponyExpress/ponyExpress_query.go`.
Code for handling the write path is in `v2/stress_client/stress_client_write.go` while the query path is in `v2/stress_client/stress_client_query.go`.
### InfluxDB Instance (stress test target)
@ -152,10 +152,10 @@ The InfluxDB which is being put under stress.
### response data
`Response`s carry points from `ponyExpress` to the `ResultsClient`.
`Response`s carry points from `stressClient` to the `ResultsClient`.
```go
// v2/ponyExpress/response.go
// v2/stress_client/response.go
type Response struct {
Point *influx.Point
Tracer *Tracer

View File

@ -1,17 +1,5 @@
# Influx Stress tool
Blockers to finishing:
* Finalize reporting
- Decide on how to incorporate TestName (db[difficult], measurement[refactor], tag[easy])
- Get feedback on reporting syntax
- Pull addition data from queries
* Documentation is sorely lacking.
- Parser behavior and proper `.iql` syntax
- How the templated query generation works
- Collection of tested `.iql` files to simulate different loads
Commune is potentially blocking writes, look into performance
This stress tool works from list of InfluxQL-esque statements. The language has been extended to allow for some basic templating of fields, tags and measurements in both line protocol and query statements.
By default the test outputs a human readable report to `STDOUT` and records test statistics in an active installation of InfluxDB at `localhost:8086`.
@ -19,17 +7,26 @@ By default the test outputs a human readable report to `STDOUT` and records test
To set state variables for the test such as the address of the Influx node use the following syntax:
```
# The values listed below are the default values for each of the parameters
# Pipe delineated list of addresses. For cluster: [192.168.0.10:8086|192.168.0.2:8086|192.168.0.3:8086]
# Queries currently hit only the first node in a list. Writes are round robin.
# Queries and writes are round-robin to the configured addresses.
SET Addresses [localhost:8086]
# Influx instance to store results
SET ResultsAddress [localhost:8086]
# False (default) uses http, true uses https
SET SSL [false]
# Username for targeted influx server or cluster
SET Username []
# Password for targeted influx server or cluster
SET Password []
# Database to target for queries and writes. Works like the InfluxCLI USE
SET Database [thing2]
SET Database [stress]
# Precision for the data being written
# Only s and ns supported
SET Precision [s]
# Date the first written point will be timestamped
@ -157,3 +154,19 @@ WAIT -> 624.585319ms
[√] "DROP DATABASE thing" -> 991.088464ms
[√] "DROP DATABASE thing2" -> 421.362831ms
```
### Next Steps:
##### Reporting
- Only use one database for reporting
- Get feedback on reporting syntax
- Pull addition data from queries
##### Documentation
- Parser behavior and proper `.iql` syntax
- How the templated query generation works
- Collection of tested `.iql` files to simulate different loads
##### Performance
- `Commune` is potentially blocking writes, look into performance.
- Templated query generation is currently in a quazi-working state.

13
stress/v2/iql/default.iql Normal file
View File

@ -0,0 +1,13 @@
CREATE DATABASE stress
GO INSERT cpu
cpu,
host=server-[int inc(0) 100000],location=us-west
value=[int rand(100) 0]
10000000 10s
GO QUERY cpu
SELECT count(value) FROM cpu WHERE %t
DO 250
WAIT

View File

@ -6,7 +6,7 @@ import (
"time"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
"github.com/influxdata/influxdb/stress/v2/stressql"
)
@ -14,7 +14,7 @@ import (
func RunStress(file string) {
// Spin up the Client
s := ponyExpress.NewStoreFront()
s := stressClient.NewStressTest()
// Parse the file into Statements
stmts, err := stressql.ParseStatements(file)
@ -40,7 +40,7 @@ func RunStress(file string) {
}
}
func blankResponse() ponyExpress.Response {
func blankResponse() stressClient.Response {
// Points must have at least one field
fields := map[string]interface{}{"done": true}
// Make a 'blank' point
@ -50,10 +50,10 @@ func blankResponse() ponyExpress.Response {
log.Fatalf("Error creating blank response point\n error: %v\n", err)
}
// Add a tracer to prevent program from returning too early
tracer := ponyExpress.NewTracer(make(map[string]string))
tracer := stressClient.NewTracer(make(map[string]string))
// Add to the WaitGroup
tracer.Add(1)
// Make a new response with the point and the tracer
resp := ponyExpress.NewResponse(p, tracer)
resp := stressClient.NewResponse(p, tracer)
return resp
}

View File

@ -1,209 +0,0 @@
package ponyExpress
import (
"fmt"
"log"
"sync"
influx "github.com/influxdata/influxdb/client/v2"
)
// NewStoreFront creates the backend for the stress test
func NewStoreFront() *StoreFront {
// Make the Package and Directive chans
packageCh := make(chan Package, 0)
directiveCh := make(chan Directive, 0)
// Make the Response chan
responseCh := make(chan Response, 0)
s := &StoreFront{
TestName: "DefaultTestName",
Precision: "s",
StartDate: "2016-01-02",
BatchSize: 5000,
packageChan: packageCh,
directiveChan: directiveCh,
ResultsChan: responseCh,
communes: make(map[string]*commune),
TestID: randStr(10),
}
// Set the results instance to localhost:8086 by default
s.SetResultsClient(influx.HTTPConfig{
Addr: fmt.Sprintf("http://%v/", "localhost:8086"),
})
// Start the client service
startPonyExpress(packageCh, directiveCh, responseCh, s.TestID)
// Listen for Results coming in
s.resultsListen()
return s
}
// NewTestStoreFront returns a StoreFront to be used for testing Statements
func NewTestStoreFront() (*StoreFront, chan Package, chan Directive) {
packageCh := make(chan Package, 0)
directiveCh := make(chan Directive, 0)
s := &StoreFront{
TestName: "DefaultTestName",
Precision: "s",
StartDate: "2016-01-02",
BatchSize: 5000,
directiveChan: directiveCh,
packageChan: packageCh,
communes: make(map[string]*commune),
TestID: randStr(10),
}
return s, packageCh, directiveCh
}
// The StoreFront is the Statement facing API that consume Statement output and coordinates the test results
type StoreFront struct {
TestID string
TestName string
Precision string
StartDate string
BatchSize int
sync.WaitGroup
sync.Mutex
packageChan chan<- Package
directiveChan chan<- Directive
ResultsChan chan Response
communes map[string]*commune
ResultsClient influx.Client
}
// SendPackage is the public facing API for to send Queries and Points
func (sf *StoreFront) SendPackage(p Package) {
sf.packageChan <- p
}
// SendDirective is the public facing API to set state variables in the test
func (sf *StoreFront) SendDirective(d Directive) {
sf.directiveChan <- d
}
// Starts a go routine that listens for Results
func (sf *StoreFront) resultsListen() {
// Make sure databases for results are created
sf.createDatabase(fmt.Sprintf("_%v", sf.TestName))
sf.createDatabase(sf.TestName)
// Listen for Responses
go func() {
// Prepare a BatchPointsConfig
bpconf := influx.BatchPointsConfig{
Database: fmt.Sprintf("_%v", sf.TestName),
Precision: "ns",
}
// Prepare the first batch of points
bp, _ := influx.NewBatchPoints(bpconf)
// TODO: Panics on resp.Tracer.Done() if there are too many 500s in a row
// Loop over ResultsChan
for resp := range sf.ResultsChan {
switch resp.Point.Name() {
// If the done point comes down the channel write the results
case "done":
sf.ResultsClient.Write(bp)
// Decrement the tracer
resp.Tracer.Done()
// By default fall back to the batcher
default:
// Add the StoreFront tags
pt := resp.AddTags(sf.tags())
// Add the point to the batch
bp = sf.batcher(pt, bp, bpconf)
// Decrement the tracer
resp.Tracer.Done()
}
}
}()
}
// Batches incoming Result.Point and sends them if the batch reaches 5k in sizes
func (sf *StoreFront) batcher(pt *influx.Point, bp influx.BatchPoints, bpconf influx.BatchPointsConfig) influx.BatchPoints {
// If fewer than 5k add point and return
if len(bp.Points()) <= 5000 {
bp.AddPoint(pt)
} else {
// Otherwise send the batch
err := sf.ResultsClient.Write(bp)
// Check error
if err != nil {
log.Fatalf("Error writing performance stats\n error: %v\n", err)
}
// Reset the batch of points
bp, _ = influx.NewBatchPoints(bpconf)
}
return bp
}
// SetResultsClient is the utility for reseting the address of the ResultsClient
func (sf *StoreFront) SetResultsClient(conf influx.HTTPConfig) {
clnt, err := influx.NewHTTPClient(conf)
if err != nil {
log.Fatalf("Error resetting results clien\n error: %v\n", err)
}
sf.ResultsClient = clnt
}
// Convinence database creation function
func (sf *StoreFront) createDatabase(db string) {
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v", db)
sf.ResultsClient.Query(influx.Query{Command: query})
}
// GetStatementResults is a convinence function for fetching all results given a StatementID
func (sf *StoreFront) GetStatementResults(sID, t string) (res []influx.Result) {
// Make the template string
qryStr := fmt.Sprintf(`SELECT * FROM "%v" WHERE statement_id = '%v'`, t, sID)
// Make the query and return the results
return sf.queryTestResults(qryStr)
}
// Runs given qry on the test results database and returns the results or nil in case of error
func (sf *StoreFront) queryTestResults(qry string) (res []influx.Result) {
q := influx.Query{
Command: qry,
Database: fmt.Sprintf("_%v", sf.TestName),
}
response, err := sf.ResultsClient.Query(q)
if err == nil {
if response.Error() != nil {
log.Fatalf("Error sending results query\n error: %v\n", response.Error())
}
}
res = response.Results
// If there are no results this indicates some kind of error
if res[0].Series == nil {
return nil
}
return res
}

View File

@ -3,7 +3,7 @@ package statement
import (
"time"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// ExecStatement run outside scripts. This functionality is not built out
@ -21,12 +21,12 @@ func (i *ExecStatement) SetID(s string) {
}
// Run statisfies the Statement Interface
func (i *ExecStatement) Run(s *ponyExpress.StoreFront) {
func (i *ExecStatement) Run(s *stressClient.StressTest) {
runtime := time.Now()
i.runtime = time.Since(runtime)
}
// Report statisfies the Statement Interface
func (i *ExecStatement) Report(s *ponyExpress.StoreFront) string {
func (i *ExecStatement) Report(s *stressClient.StressTest) string {
return ""
}

View File

@ -3,7 +3,7 @@ package statement
import (
"testing"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
func TestExecSetID(t *testing.T) {
@ -17,7 +17,7 @@ func TestExecSetID(t *testing.T) {
func TestExecRun(t *testing.T) {
e := newTestExec()
s, _, _ := ponyExpress.NewTestStoreFront()
s, _, _ := stressClient.NewTestStressTest()
e.Run(s)
if e == nil {
t.Fail()
@ -26,7 +26,7 @@ func TestExecRun(t *testing.T) {
func TestExecReport(t *testing.T) {
e := newTestExec()
s, _, _ := ponyExpress.NewTestStoreFront()
s, _, _ := stressClient.NewTestStressTest()
rep := e.Report(s)
if rep != "" {
t.Fail()

View File

@ -2,8 +2,9 @@ package statement
import (
"fmt"
"time"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// GoStatement is a Statement Implementation to allow other statements to be run concurrently
@ -19,7 +20,13 @@ func (i *GoStatement) SetID(s string) {
}
// Run statisfies the Statement Interface
func (i *GoStatement) Run(s *ponyExpress.StoreFront) {
func (i *GoStatement) Run(s *stressClient.StressTest) {
// TODO: remove
switch i.Statement.(type) {
case *QueryStatement:
time.Sleep(1 * time.Second)
}
s.Add(1)
go func() {
i.Statement.Run(s)
@ -28,6 +35,6 @@ func (i *GoStatement) Run(s *ponyExpress.StoreFront) {
}
// Report statisfies the Statement Interface
func (i *GoStatement) Report(s *ponyExpress.StoreFront) string {
func (i *GoStatement) Report(s *stressClient.StressTest) string {
return fmt.Sprintf("Go %v", i.Statement.Report(s))
}

View File

@ -3,7 +3,7 @@ package statement
import (
"testing"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
func TestGoSetID(t *testing.T) {
@ -17,7 +17,7 @@ func TestGoSetID(t *testing.T) {
func TestGoRun(t *testing.T) {
e := newTestGo()
s, _, _ := ponyExpress.NewTestStoreFront()
s, _, _ := stressClient.NewTestStressTest()
e.Run(s)
if e == nil {
t.Fail()
@ -26,7 +26,7 @@ func TestGoRun(t *testing.T) {
func TestGoReport(t *testing.T) {
e := newTestGo()
s, _, _ := ponyExpress.NewTestStoreFront()
s, _, _ := stressClient.NewTestStressTest()
report := e.Report(s)
if report != "Go " {
t.Errorf("Expected: %v\nGot: %v\n", "Go ", report)

View File

@ -4,14 +4,14 @@ import (
"log"
"time"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// InfluxqlStatement is a Statement Implementation that allows statements that parse in InfluxQL to be passed directly to the target instance
type InfluxqlStatement struct {
StatementID string
Query string
Tracer *ponyExpress.Tracer
Tracer *stressClient.Tracer
}
func (i *InfluxqlStatement) tags() map[string]string {
@ -25,13 +25,13 @@ func (i *InfluxqlStatement) SetID(s string) {
}
// Run statisfies the Statement Interface
func (i *InfluxqlStatement) Run(s *ponyExpress.StoreFront) {
func (i *InfluxqlStatement) Run(s *stressClient.StressTest) {
// Set the tracer
i.Tracer = ponyExpress.NewTracer(i.tags())
i.Tracer = stressClient.NewTracer(i.tags())
// Make the Package
p := ponyExpress.NewPackage(ponyExpress.Query, []byte(i.Query), i.StatementID, i.Tracer)
p := stressClient.NewPackage(stressClient.Query, []byte(i.Query), i.StatementID, i.Tracer)
// Increment the tracer
i.Tracer.Add(1)
@ -45,7 +45,7 @@ func (i *InfluxqlStatement) Run(s *ponyExpress.StoreFront) {
// Report statisfies the Statement Interface
// No test coverage, fix
func (i *InfluxqlStatement) Report(s *ponyExpress.StoreFront) (out string) {
func (i *InfluxqlStatement) Report(s *stressClient.StressTest) (out string) {
allData := s.GetStatementResults(i.StatementID, "query")
iqlr := &influxQlReport{

View File

@ -3,7 +3,7 @@ package statement
import (
"testing"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
func TestInfluxQlSetID(t *testing.T) {
@ -17,10 +17,10 @@ func TestInfluxQlSetID(t *testing.T) {
func TestInfluxQlRun(t *testing.T) {
e := newTestInfluxQl()
s, packageCh, _ := ponyExpress.NewTestStoreFront()
s, packageCh, _ := stressClient.NewTestStressTest()
go func() {
for pkg := range packageCh {
if pkg.T != ponyExpress.Query {
if pkg.T != stressClient.Query {
t.Errorf("Expected package to be Query\nGot: %v", pkg.T)
}
if string(pkg.Body) != e.Query {
@ -38,7 +38,7 @@ func TestInfluxQlRun(t *testing.T) {
func newTestInfluxQl() *InfluxqlStatement {
return &InfluxqlStatement{
Query: "CREATE DATABASE foo",
Tracer: ponyExpress.NewTracer(make(map[string]string)),
Tracer: stressClient.NewTracer(make(map[string]string)),
StatementID: "fooID",
}
}

View File

@ -9,7 +9,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// InsertStatement is a Statement Implementation that creates points to be written to the target InfluxDB instance
@ -27,7 +27,7 @@ type InsertStatement struct {
TagCount int
// The Tracer prevents InsertStatement.Run() from returning early
Tracer *ponyExpress.Tracer
Tracer *stressClient.Tracer
// Timestamp is #points to write and percision
Timestamp *Timestamp
@ -65,7 +65,7 @@ func (i *InsertStatement) SetID(s string) {
}
// SetVars sets up the environment for InsertStatement to call it's Run function
func (i *InsertStatement) SetVars(s *ponyExpress.StoreFront) chan<- string {
func (i *InsertStatement) SetVars(s *stressClient.StressTest) chan<- string {
// Set the #series at 1 to start
i.series = 1
@ -80,19 +80,19 @@ func (i *InsertStatement) SetVars(s *ponyExpress.StoreFront) chan<- string {
// Set the time function, keeps track of 'time' of the points being created
i.time = i.Timestamp.Time(s.StartDate, i.series, s.Precision)
// Set a commune on the StoreFront
// Set a commune on the StressTest
s.Lock()
comCh := s.SetCommune(i.Name)
s.Unlock()
// Set the tracer
i.Tracer = ponyExpress.NewTracer(i.tags())
i.Tracer = stressClient.NewTracer(i.tags())
return comCh
}
// Run statisfies the Statement Interface
func (i *InsertStatement) Run(s *ponyExpress.StoreFront) {
func (i *InsertStatement) Run(s *stressClient.StressTest) {
// Set variables on the InsertStatement and make the comCh
comCh := i.SetVars(s)
@ -126,7 +126,7 @@ func (i *InsertStatement) Run(s *ponyExpress.StoreFront) {
b = b[0 : len(b)-1]
// Create the package
p := ponyExpress.NewPackage(ponyExpress.Write, b, i.StatementID, i.Tracer)
p := stressClient.NewPackage(stressClient.Write, b, i.StatementID, i.Tracer)
// Use Tracer to wait for all operations to finish
i.Tracer.Add(1)
@ -159,7 +159,7 @@ func (i *InsertStatement) Run(s *ponyExpress.StoreFront) {
b = b[0 : len(b)-1]
// Create the package
p := ponyExpress.NewPackage(ponyExpress.Write, b, i.StatementID, i.Tracer)
p := stressClient.NewPackage(stressClient.Write, b, i.StatementID, i.Tracer)
// Use Tracer to wait for all operations to finish
i.Tracer.Add(1)
@ -176,8 +176,8 @@ func (i *InsertStatement) Run(s *ponyExpress.StoreFront) {
}
// Report statisfies the Statement Interface
func (i *InsertStatement) Report(s *ponyExpress.StoreFront) string {
// Pull data via StoreFront client
func (i *InsertStatement) Report(s *stressClient.StressTest) string {
// Pull data via StressTest client
allData := s.GetStatementResults(i.StatementID, "write")
if allData == nil || allData[0].Series == nil {

View File

@ -4,7 +4,7 @@ import (
"strings"
"testing"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
func TestInsertSetID(t *testing.T) {
@ -18,7 +18,7 @@ func TestInsertSetID(t *testing.T) {
func TestInsertRun(t *testing.T) {
i := newTestInsert()
s, packageCh, _ := ponyExpress.NewTestStoreFront()
s, packageCh, _ := stressClient.NewTestStressTest()
// Listen to the other side of the directiveCh
go func() {
for pkg := range packageCh {

View File

@ -6,7 +6,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// QueryStatement is a Statement Implementation to run queries on the target InfluxDB instance
@ -22,7 +22,7 @@ type QueryStatement struct {
Count int
// Tracer for tracking returns
Tracer *ponyExpress.Tracer
Tracer *stressClient.Tracer
// track time for all queries
runtime time.Duration
@ -40,10 +40,9 @@ func (i *QueryStatement) SetID(s string) {
}
// Run statisfies the Statement Interface
func (i *QueryStatement) Run(s *ponyExpress.StoreFront) {
func (i *QueryStatement) Run(s *stressClient.StressTest) {
// Set the tracer
i.Tracer = ponyExpress.NewTracer(i.tags())
i.Tracer = stressClient.NewTracer(i.tags())
vals := make(map[string]interface{})
@ -58,7 +57,7 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) {
b := []byte(i.TemplateString)
// Make the package
p := ponyExpress.NewPackage(ponyExpress.Query, b, i.StatementID, i.Tracer)
p := stressClient.NewPackage(stressClient.Query, b, i.StatementID, i.Tracer)
// Increment the tracer
i.Tracer.Add(1)
@ -71,7 +70,7 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) {
// TODO: Currently the program lock up here if s.GetPoint
// cannot return a value, which can happen.
// Seee insert.go
// See insert.go
s.Lock()
point = s.GetPoint(i.Name, s.Precision)
s.Unlock()
@ -82,7 +81,7 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) {
b := []byte(fmt.Sprintf(i.TemplateString, setArgs(vals, i.Args)...))
// Make the package
p := ponyExpress.NewPackage(ponyExpress.Query, b, i.StatementID, i.Tracer)
p := stressClient.NewPackage(stressClient.Query, b, i.StatementID, i.Tracer)
// Increment the tracer
i.Tracer.Add(1)
@ -101,8 +100,8 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) {
}
// Report statisfies the Statement Interface
func (i *QueryStatement) Report(s *ponyExpress.StoreFront) string {
// Pull data via StoreFront client
func (i *QueryStatement) Report(s *stressClient.StressTest) string {
// Pull data via StressTest client
allData := s.GetStatementResults(i.StatementID, "query")
if len(allData) == 0 || allData[0].Series == nil {

View File

@ -3,7 +3,7 @@ package statement
import (
"testing"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
func TestQuerySetID(t *testing.T) {
@ -17,7 +17,7 @@ func TestQuerySetID(t *testing.T) {
func TestQueryRun(t *testing.T) {
i := newTestQuery()
s, packageCh, _ := ponyExpress.NewTestStoreFront()
s, packageCh, _ := stressClient.NewTestStressTest()
// Listen to the other side of the directiveCh
go func() {
for pkg := range packageCh {
@ -37,6 +37,6 @@ func newTestQuery() *QueryStatement {
TemplateString: "SELECT count(value) FROM cpu",
Args: []string{},
Count: 5,
Tracer: ponyExpress.NewTracer(map[string]string{}),
Tracer: stressClient.NewTracer(map[string]string{}),
}
}

View File

@ -4,8 +4,7 @@ import (
"fmt"
"strings"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// SetStatement set state variables for the test
@ -15,7 +14,7 @@ type SetStatement struct {
StatementID string
Tracer *ponyExpress.Tracer
Tracer *stressClient.Tracer
}
// SetID statisfies the Statement Interface
@ -24,57 +23,30 @@ func (i *SetStatement) SetID(s string) {
}
// Run statisfies the Statement Interface
func (i *SetStatement) Run(s *ponyExpress.StoreFront) {
// Set the Tracer
i.Tracer = ponyExpress.NewTracer(make(map[string]string))
// Create a new Directive
d := ponyExpress.NewDirective(strings.ToLower(i.Var), strings.ToLower(i.Value), i.Tracer)
func (i *SetStatement) Run(s *stressClient.StressTest) {
i.Tracer = stressClient.NewTracer(make(map[string]string))
d := stressClient.NewDirective(strings.ToLower(i.Var), strings.ToLower(i.Value), i.Tracer)
switch d.Property {
// Needs to be set on both StoreFront and ponyExpress
// Needs to be set on both StressTest and stressClient
// Set the write percison for points generated
case "precision":
s.Precision = d.Value
// Increment the tracer
i.Tracer.Add(1)
s.SendDirective(d)
// Lives on StoreFront
// Lives on StressTest
// Set the date for the first point entered into the database
case "startdate":
s.Lock()
s.StartDate = d.Value
s.Unlock()
// Lives on StoreFront
// Lives on StressTest
// Set the BatchSize for writes
case "batchsize":
s.Lock()
s.BatchSize = parseInt(d.Value)
s.Unlock()
// Lives on StoreFront
// Reset the ResultsClient to have a new address
case "resultsaddress":
s.Lock()
s.SetResultsClient(influx.HTTPConfig{Addr: fmt.Sprintf("http://%v/", d.Value)})
s.Unlock()
// TODO: Make TestName actually change the reporting DB
// Lives on StoreFront
// Set the TestName that controls reporting DB
case "testname":
s.Lock()
s.TestName = d.Value
s.Unlock()
// All other variables live on ponyExpress
// All other variables live on stressClient
default:
// Increment the tracer
i.Tracer.Add(1)
s.SendDirective(d)
}
@ -82,6 +54,6 @@ func (i *SetStatement) Run(s *ponyExpress.StoreFront) {
}
// Report statisfies the Statement Interface
func (i *SetStatement) Report(s *ponyExpress.StoreFront) string {
func (i *SetStatement) Report(s *stressClient.StressTest) string {
return fmt.Sprintf("SET %v = '%v'", i.Var, i.Value)
}

View File

@ -4,7 +4,7 @@ import (
"fmt"
"testing"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
func TestSetSetID(t *testing.T) {
@ -37,15 +37,15 @@ func TestSetRun(t *testing.T) {
func testSetRunUtl(t *testing.T, property string, value string) {
i := newTestSet(property, value)
s, _, directiveCh := ponyExpress.NewTestStoreFront()
s, _, directiveCh := stressClient.NewTestStressTest()
// Listen to the other side of the directiveCh
go func() {
for d := range directiveCh {
if i.Var != d.Property {
t.Errorf("wrong property sent to ponyExpress\n expected: %v\n got: %v\n", i.Var, d.Property)
t.Errorf("wrong property sent to stressClient\n expected: %v\n got: %v\n", i.Var, d.Property)
}
if i.Value != d.Value {
t.Errorf("wrong value sent to ponyExpress\n expected: %v\n got: %v\n", i.Value, d.Value)
t.Errorf("wrong value sent to stressClient\n expected: %v\n got: %v\n", i.Value, d.Value)
}
d.Tracer.Done()
}
@ -68,17 +68,13 @@ func testSetRunUtl(t *testing.T, property string, value string) {
}
// TODO: Actually test this
case "resultsaddress":
case "testname":
if i.Value != s.TestName {
t.Errorf("Failed to set %v\n", i.Var)
}
default:
}
}
func TestSetReport(t *testing.T) {
set := newTestSet("this", "that")
s, _, _ := ponyExpress.NewTestStoreFront()
s, _, _ := stressClient.NewTestStressTest()
rpt := set.Report(s)
expected := fmt.Sprintf("SET %v = '%v'", set.Var, set.Value)
if rpt != expected {
@ -90,7 +86,7 @@ func newTestSet(toSet, value string) *SetStatement {
return &SetStatement{
Var: toSet,
Value: value,
Tracer: ponyExpress.NewTracer(make(map[string]string)),
Tracer: stressClient.NewTracer(make(map[string]string)),
StatementID: "fooID",
}
}

View File

@ -4,14 +4,14 @@ import (
"log"
"strconv"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// Statement is the common interface to shape the testing environment and prepare database requests
// The parser turns the 'statements' in the config file into Statements
type Statement interface {
Run(s *ponyExpress.StoreFront)
Report(s *ponyExpress.StoreFront) string
Run(s *stressClient.StressTest)
Report(s *stressClient.StressTest) string
SetID(s string)
}

View File

@ -4,7 +4,7 @@ import (
"fmt"
"time"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
// WaitStatement is a Statement Implementation to prevent the test from returning to early when running GoStatements
@ -20,13 +20,13 @@ func (w *WaitStatement) SetID(s string) {
}
// Run statisfies the Statement Interface
func (w *WaitStatement) Run(s *ponyExpress.StoreFront) {
func (w *WaitStatement) Run(s *stressClient.StressTest) {
runtime := time.Now()
s.Wait()
w.runtime = time.Since(runtime)
}
// Report statisfies the Statement Interface
func (w *WaitStatement) Report(s *ponyExpress.StoreFront) string {
func (w *WaitStatement) Report(s *stressClient.StressTest) string {
return fmt.Sprintf("WAIT -> %v", w.runtime)
}

View File

@ -4,7 +4,7 @@ import (
"strings"
"testing"
"github.com/influxdata/influxdb/stress/v2/ponyExpress"
"github.com/influxdata/influxdb/stress/v2/stress_client"
)
func TestWaitSetID(t *testing.T) {
@ -18,7 +18,7 @@ func TestWaitSetID(t *testing.T) {
func TestWaitRun(t *testing.T) {
e := newTestWait()
s, _, _ := ponyExpress.NewTestStoreFront()
s, _, _ := stressClient.NewTestStressTest()
e.Run(s)
if e == nil {
t.Fail()
@ -27,7 +27,7 @@ func TestWaitRun(t *testing.T) {
func TestWaitReport(t *testing.T) {
e := newTestWait()
s, _, _ := ponyExpress.NewTestStoreFront()
s, _, _ := stressClient.NewTestStressTest()
rpt := e.Report(s)
if !strings.Contains(rpt, "WAIT") {
t.Fail()

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"log"
@ -37,17 +37,17 @@ func (c *commune) point(precision string) models.Point {
return p[0]
}
// SetCommune creates a new commune on the StoreFront
func (sf *StoreFront) SetCommune(name string) chan<- string {
// SetCommune creates a new commune on the StressTest
func (st *StressTest) SetCommune(name string) chan<- string {
com := newCommune(10)
sf.communes[name] = com
st.communes[name] = com
return com.ch
}
// GetPoint is called by a QueryStatement and retrieves a point sent by the associated InsertStatement
func (sf *StoreFront) GetPoint(name, precision string) models.Point {
p := sf.communes[name].point(precision)
func (st *StressTest) GetPoint(name, precision string) models.Point {
p := st.communes[name].point(precision)
// Function needs to return a point. Panic if it doesn't
if p == nil {

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"testing"
@ -33,7 +33,7 @@ func TestCommunePoint(t *testing.T) {
}
func TestSetCommune(t *testing.T) {
sf, _, _ := NewTestStoreFront()
sf, _, _ := NewTestStressTest()
ch := sf.SetCommune("foo_name")
ch <- "write,tag=tagVal fooField=5 1460912595"
pt := sf.GetPoint("foo_name", "s")

View File

@ -1,6 +1,6 @@
package ponyExpress
package stressClient
// Directive is a struct to enable communication between SetStatements and the ponyExpress backend
// Directive is a struct to enable communication between SetStatements and the stressClient backend
// Directives change state for the stress test
type Directive struct {
Property string

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"testing"

View File

@ -1,6 +1,6 @@
package ponyExpress
package stressClient
// Package is a struct to enable communication between InsertStatements, QueryStatements and InfluxQLStatements and the ponyExpress backend
// Package is a struct to enable communication between InsertStatements, QueryStatements and InfluxQLStatements and the stressClient backend
// Packages carry either writes or queries in the []byte that makes up the Body
type Package struct {
T Type

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"testing"

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"log"
@ -8,37 +8,37 @@ import (
influx "github.com/influxdata/influxdb/client/v2"
)
// reporting.go contains functions to emit tags and points from various parts of ponyExpress
// reporting.go contains functions to emit tags and points from various parts of stressClient
// These points are then written to the ("_%v", sf.TestName) database
// These are the tags that ponyExpress adds to any response points
func (pe *ponyExpress) tags(statementID string) map[string]string {
// These are the tags that stressClient adds to any response points
func (sc *stressClient) tags(statementID string) map[string]string {
tags := map[string]string{
"number_targets": fmtInt(len(pe.addresses)),
"precision": pe.precision,
"writers": fmtInt(pe.wconc),
"readers": fmtInt(pe.qconc),
"test_id": pe.testID,
"number_targets": fmtInt(len(sc.addresses)),
"precision": sc.precision,
"writers": fmtInt(sc.wconc),
"readers": fmtInt(sc.qconc),
"test_id": sc.testID,
"statement_id": statementID,
"write_interval": pe.wdelay,
"query_interval": pe.qdelay,
"write_interval": sc.wdelay,
"query_interval": sc.qdelay,
}
return tags
}
// These are the tags that the StoreFront adds to any response points
func (sf *StoreFront) tags() map[string]string {
// These are the tags that the StressTest adds to any response points
func (st *StressTest) tags() map[string]string {
tags := map[string]string{
"precision": sf.Precision,
"batch_size": fmtInt(sf.BatchSize),
"precision": st.Precision,
"batch_size": fmtInt(st.BatchSize),
}
return tags
}
// This function makes a *client.Point for reporting on writes
func (pe *ponyExpress) writePoint(retries int, statementID string, statusCode int, responseTime time.Duration, addedTags map[string]string, writeBytes int) *influx.Point {
func (sc *stressClient) writePoint(retries int, statementID string, statusCode int, responseTime time.Duration, addedTags map[string]string, writeBytes int) *influx.Point {
tags := sumTags(pe.tags(statementID), addedTags)
tags := sumTags(sc.tags(statementID), addedTags)
fields := map[string]interface{}{
"status_code": statusCode,
@ -56,9 +56,9 @@ func (pe *ponyExpress) writePoint(retries int, statementID string, statusCode in
}
// This function makes a *client.Point for reporting on queries
func (pe *ponyExpress) queryPoint(statementID string, body []byte, statusCode int, responseTime time.Duration, addedTags map[string]string) *influx.Point {
func (sc *stressClient) queryPoint(statementID string, body []byte, statusCode int, responseTime time.Duration, addedTags map[string]string) *influx.Point {
tags := sumTags(pe.tags(statementID), addedTags)
tags := sumTags(sc.tags(statementID), addedTags)
fields := map[string]interface{}{
"status_code": statusCode,

View File

@ -1,12 +1,12 @@
package ponyExpress
package stressClient
import (
"testing"
"time"
)
func TestNewPonyExpressTags(t *testing.T) {
pe, _, _ := newTestPonyExpress("localhost:8086")
func TestNewStressClientTags(t *testing.T) {
pe, _, _ := newTestStressClient("localhost:8086")
tags := pe.tags("foo_id")
expected := fmtInt(len(pe.addresses))
got := tags["number_targets"]
@ -30,8 +30,8 @@ func TestNewPonyExpressTags(t *testing.T) {
}
}
func TestNewStorefrontTags(t *testing.T) {
sf, _, _ := NewTestStoreFront()
func TestNewStressTestTags(t *testing.T) {
sf, _, _ := NewTestStressTest()
tags := sf.tags()
expected := sf.Precision
got := tags["precision"]
@ -46,7 +46,7 @@ func TestNewStorefrontTags(t *testing.T) {
}
func TestWritePoint(t *testing.T) {
pe, _, _ := newTestPonyExpress("localhost:8086")
pe, _, _ := newTestStressClient("localhost:8086")
statementID := "foo_id"
responseCode := 200
responseTime := time.Duration(10 * time.Millisecond)
@ -69,7 +69,7 @@ func TestWritePoint(t *testing.T) {
}
func TestQueryPoint(t *testing.T) {
pe, _, _ := newTestPonyExpress("localhost:8086")
pe, _, _ := newTestStressClient("localhost:8086")
statementID := "foo_id"
responseCode := 200
body := []byte{12}

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"log"

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"testing"

View File

@ -0,0 +1,172 @@
package stressClient
import (
"fmt"
"log"
"sync"
influx "github.com/influxdata/influxdb/client/v2"
)
// NewStressTest creates the backend for the stress test
func NewStressTest() *StressTest {
packageCh := make(chan Package, 0)
directiveCh := make(chan Directive, 0)
responseCh := make(chan Response, 0)
clnt, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: fmt.Sprintf("http://%v/", "localhost:8086"),
})
s := &StressTest{
TestDB: "_stressTest",
Precision: "s",
StartDate: "2016-01-02",
BatchSize: 5000,
packageChan: packageCh,
directiveChan: directiveCh,
ResultsClient: clnt,
ResultsChan: responseCh,
communes: make(map[string]*commune),
TestID: randStr(10),
}
// Start the client service
startStressClient(packageCh, directiveCh, responseCh, s.TestID)
// Listen for Results coming in
s.resultsListen()
return s
}
// NewTestStressTest returns a StressTest to be used for testing Statements
func NewTestStressTest() (*StressTest, chan Package, chan Directive) {
packageCh := make(chan Package, 0)
directiveCh := make(chan Directive, 0)
s := &StressTest{
TestDB: "_stressTest",
Precision: "s",
StartDate: "2016-01-02",
BatchSize: 5000,
directiveChan: directiveCh,
packageChan: packageCh,
communes: make(map[string]*commune),
TestID: randStr(10),
}
return s, packageCh, directiveCh
}
// The StressTest is the Statement facing API that consumes Statement output and coordinates the test results
type StressTest struct {
TestID string
TestDB string
Precision string
StartDate string
BatchSize int
sync.WaitGroup
sync.Mutex
packageChan chan<- Package
directiveChan chan<- Directive
ResultsChan chan Response
communes map[string]*commune
ResultsClient influx.Client
}
// SendPackage is the public facing API for to send Queries and Points
func (st *StressTest) SendPackage(p Package) {
st.packageChan <- p
}
// SendDirective is the public facing API to set state variables in the test
func (st *StressTest) SendDirective(d Directive) {
st.directiveChan <- d
}
// Starts a go routine that listens for Results
func (st *StressTest) resultsListen() {
st.createDatabase(st.TestDB)
go func() {
bp := st.NewResultsPointBatch()
for resp := range st.ResultsChan {
switch resp.Point.Name() {
case "done":
st.ResultsClient.Write(bp)
resp.Tracer.Done()
default:
// Add the StressTest tags
pt := resp.AddTags(st.tags())
// Add the point to the batch
bp = st.batcher(pt, bp)
resp.Tracer.Done()
}
}
}()
}
// NewResultsPointBatch creates a new batch of points for the results
func (st *StressTest) NewResultsPointBatch() influx.BatchPoints {
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: st.TestDB,
Precision: "ns",
})
return bp
}
// Batches incoming Result.Point and sends them if the batch reaches 5k in size
func (st *StressTest) batcher(pt *influx.Point, bp influx.BatchPoints) influx.BatchPoints {
if len(bp.Points()) <= 5000 {
bp.AddPoint(pt)
} else {
err := st.ResultsClient.Write(bp)
if err != nil {
log.Fatalf("Error writing performance stats\n error: %v\n", err)
}
bp = st.NewResultsPointBatch()
}
return bp
}
// Convinence database creation function
func (st *StressTest) createDatabase(db string) {
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v", db)
res, err := st.ResultsClient.Query(influx.Query{Command: query})
if err != nil {
log.Fatalf("error: no running influx server at localhost:8086")
if res.Error() != nil {
log.Fatalf("error: no running influx server at localhost:8086")
}
}
}
// GetStatementResults is a convinence function for fetching all results given a StatementID
func (st *StressTest) GetStatementResults(sID, t string) (res []influx.Result) {
qryStr := fmt.Sprintf(`SELECT * FROM "%v" WHERE statement_id = '%v'`, t, sID)
return st.queryTestResults(qryStr)
}
// Runs given qry on the test results database and returns the results or nil in case of error
func (st *StressTest) queryTestResults(qry string) (res []influx.Result) {
response, err := st.ResultsClient.Query(influx.Query{Command: qry, Database: st.TestDB})
if err == nil {
if response.Error() != nil {
log.Fatalf("Error sending results query\n error: %v\n", response.Error())
}
}
if response.Results[0].Series == nil {
return nil
}
return response.Results
}

View File

@ -1,7 +1,6 @@
package ponyExpress
package stressClient
import (
"fmt"
"testing"
"time"
@ -18,15 +17,15 @@ func NewBlankTestPoint() *influx.Point {
return pt
}
func TestStoreFrontBatcher(t *testing.T) {
sf, _, _ := NewTestStoreFront()
func TestStressTestBatcher(t *testing.T) {
sf, _, _ := NewTestStressTest()
bpconf := influx.BatchPointsConfig{
Database: fmt.Sprintf("_%v", sf.TestName),
Database: sf.TestDB,
Precision: "ns",
}
bp, _ := influx.NewBatchPoints(bpconf)
pt := NewBlankTestPoint()
bp = sf.batcher(pt, bp, bpconf)
bp = sf.batcher(pt, bp)
if len(bp.Points()) != 1 {
t.Fail()
}

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"strings"
@ -14,9 +14,9 @@ const (
Query
)
func startPonyExpress(packageCh <-chan Package, directiveCh <-chan Directive, responseCh chan<- Response, testID string) {
func startStressClient(packageCh <-chan Package, directiveCh <-chan Directive, responseCh chan<- Response, testID string) {
c := &ponyExpress{
c := &stressClient{
testID: testID,
addresses: []string{"localhost:8086"},
@ -39,12 +39,11 @@ func startPonyExpress(packageCh <-chan Package, directiveCh <-chan Directive, re
}
// start listening for writes and queries
go c.listen()
// start listening for state changes
go c.directiveListen()
}
type ponyExpress struct {
type stressClient struct {
testID string
// State for the Stress Test
@ -78,11 +77,11 @@ type ponyExpress struct {
rc *ConcurrencyLimiter
}
// NewTestPonyExpress returns a blank ponyExpress for testing
func newTestPonyExpress(url string) (*ponyExpress, chan Directive, chan Package) {
// NewTestStressClient returns a blank stressClient for testing
func newTestStressClient(url string) (*stressClient, chan Directive, chan Package) {
pkgChan := make(chan Package)
dirChan := make(chan Directive)
pe := &ponyExpress{
pe := &stressClient{
testID: "foo_id",
addresses: []string{url},
precision: "s",
@ -103,30 +102,22 @@ func newTestPonyExpress(url string) (*ponyExpress, chan Directive, chan Package)
return pe, dirChan, pkgChan
}
// client starts listening for Packages on the main channel
func (pe *ponyExpress) listen() {
defer pe.Wait()
// Keep track of number of concurrent readers and writers seperately
pe.wc = NewConcurrencyLimiter(pe.wconc)
pe.rc = NewConcurrencyLimiter(pe.qconc)
// Manage overall number of goroutines and keep at 2 x (wconc + qconc)
l := NewConcurrencyLimiter((pe.wconc + pe.qconc) * 2)
// Concume incoming packages
// stressClient starts listening for Packages on the main channel
func (sc *stressClient) listen() {
defer sc.Wait()
sc.wc = NewConcurrencyLimiter(sc.wconc)
sc.rc = NewConcurrencyLimiter(sc.qconc)
l := NewConcurrencyLimiter((sc.wconc + sc.qconc) * 2)
counter := 0
for p := range pe.packageChan {
serv := counter % len(pe.addresses)
for p := range sc.packageChan {
l.Increment()
go func(p Package) {
defer l.Decrement()
switch p.T {
case Write:
pe.spinOffWritePackage(p, serv)
sc.spinOffWritePackage(p, (counter % len(sc.addresses)))
case Query:
pe.spinOffQueryPackage(p, serv)
sc.spinOffQueryPackage(p, (counter % len(sc.addresses)))
}
}(p)
counter++
@ -135,64 +126,50 @@ func (pe *ponyExpress) listen() {
}
// Set handles all SET requests for test state
func (pe *ponyExpress) directiveListen() {
for d := range pe.directiveChan {
pe.Lock()
func (sc *stressClient) directiveListen() {
for d := range sc.directiveChan {
sc.Lock()
switch d.Property {
// addresses is a []string of target InfluxDB instance(s) for the test
// comes in as a "|" seperated array of addresses
case "addresses":
addr := strings.Split(d.Value, "|")
pe.addresses = addr
sc.addresses = addr
// percison is the write precision for InfluxDB
case "precision":
pe.precision = d.Value
sc.precision = d.Value
// writeinterval is an optional delay between batches
case "writeinterval":
pe.wdelay = d.Value
sc.wdelay = d.Value
// queryinterval is an optional delay between the batches
case "queryinterval":
pe.qdelay = d.Value
sc.qdelay = d.Value
// database is the InfluxDB database to target for both writes and queries
case "database":
pe.database = d.Value
sc.database = d.Value
// username for the target database
case "username":
pe.username = d.Value
sc.username = d.Value
// username for the target database
case "password":
pe.password = d.Value
// use https if the there is a value for ssl
sc.password = d.Value
// use https if sent true
case "ssl":
if d.Value == "true" {
pe.ssl = true
sc.ssl = true
}
// concurrency is the number concurrent writers to the database
// concurrency is the number concurrent writers to the database
case "writeconcurrency":
conc := parseInt(d.Value)
pe.wconc = conc
// Reset the ConcurrencyLimiter
pe.wc.NewMax(conc)
// concurrentqueries is the number of concurrent queries to run against the database
sc.wconc = conc
sc.wc.NewMax(conc)
// concurrentqueries is the number of concurrent queriers database
case "queryconcurrency":
conc := parseInt(d.Value)
pe.qconc = conc
// Reset the ConcurrencyLimiter
pe.rc.NewMax(conc)
sc.qconc = conc
sc.rc.NewMax(conc)
}
// Decrement the tracker
d.Tracer.Done()
pe.Unlock()
sc.Unlock()
}
}

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"fmt"
@ -9,38 +9,38 @@ import (
"time"
)
func (pe *ponyExpress) spinOffQueryPackage(p Package, serv int) {
pe.Add(1)
pe.rc.Increment()
func (sc *stressClient) spinOffQueryPackage(p Package, serv int) {
sc.Add(1)
sc.rc.Increment()
go func() {
// Send the query
pe.prepareQuerySend(p, serv)
pe.Done()
pe.rc.Decrement()
sc.prepareQuerySend(p, serv)
sc.Done()
sc.rc.Decrement()
}()
}
// Prepares to send the GET request
func (pe *ponyExpress) prepareQuerySend(p Package, serv int) {
func (sc *stressClient) prepareQuerySend(p Package, serv int) {
var queryTemplate string
if pe.ssl {
if sc.ssl {
queryTemplate = "https://%v/query?db=%v&q=%v&u=%v&p=%v"
} else {
queryTemplate = "http://%v/query?db=%v&q=%v&u=%v&p=%v"
}
queryURL := fmt.Sprintf(queryTemplate, pe.addresses[serv], pe.database, url.QueryEscape(string(p.Body)), pe.username, pe.password)
queryURL := fmt.Sprintf(queryTemplate, sc.addresses[serv], sc.database, url.QueryEscape(string(p.Body)), sc.username, sc.password)
// Send the query
pe.makeGet(queryURL, p.StatementID, p.Tracer)
sc.makeGet(queryURL, p.StatementID, p.Tracer)
// Query Interval enforcement
qi, _ := time.ParseDuration(pe.qdelay)
qi, _ := time.ParseDuration(sc.qdelay)
time.Sleep(qi)
}
// Sends the GET request, reads it, and handles errors
func (pe *ponyExpress) makeGet(addr, statementID string, tr *Tracer) {
func (sc *stressClient) makeGet(addr, statementID string, tr *Tracer) {
// Make GET request
t := time.Now()
@ -65,7 +65,7 @@ func (pe *ponyExpress) makeGet(addr, statementID string, tr *Tracer) {
}
// Send the response
pe.responseChan <- NewResponse(pe.queryPoint(statementID, body, resp.StatusCode, elapsed, tr.Tags), tr)
sc.responseChan <- NewResponse(sc.queryPoint(statementID, body, resp.StatusCode, elapsed, tr.Tags), tr)
}
func success(r *http.Response) bool {

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"bytes"
@ -14,18 +14,18 @@ import (
// ###############################################
// Packages up Package from channel in goroutine
func (pe *ponyExpress) spinOffWritePackage(p Package, serv int) {
pe.Add(1)
pe.wc.Increment()
func (sc *stressClient) spinOffWritePackage(p Package, serv int) {
sc.Add(1)
sc.wc.Increment()
go func() {
pe.retry(p, time.Duration(time.Nanosecond), serv)
pe.Done()
pe.wc.Decrement()
sc.retry(p, time.Duration(time.Nanosecond), serv)
sc.Done()
sc.wc.Decrement()
}()
}
// Implements backoff and retry logic for 500 responses
func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) {
func (sc *stressClient) retry(p Package, backoff time.Duration, serv int) {
// Set Backoff Interval to 500ms
backoffInterval := time.Duration(500 * time.Millisecond)
@ -34,7 +34,7 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) {
bo := backoff + backoffInterval
// Make the write request
resp, elapsed, err := pe.prepareWrite(p.Body, serv)
resp, elapsed, err := sc.prepareWrite(p.Body, serv)
// Find number of times request has been retried
numBackoffs := int(bo/backoffInterval) - 1
@ -49,13 +49,13 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) {
}
// Make a point for reporting
point := pe.writePoint(numBackoffs, p.StatementID, statusCode, elapsed, p.Tracer.Tags, len(p.Body))
point := sc.writePoint(numBackoffs, p.StatementID, statusCode, elapsed, p.Tracer.Tags, len(p.Body))
// Send the Response(point, tracer)
pe.responseChan <- NewResponse(point, p.Tracer)
sc.responseChan <- NewResponse(point, p.Tracer)
// BatchInterval enforcement
bi, _ := time.ParseDuration(pe.wdelay)
bi, _ := time.ParseDuration(sc.wdelay)
time.Sleep(bi)
// Retry if the statusCode was not 204 or the err != nil
@ -66,22 +66,22 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) {
fmt.Println(err)
// Backoff enforcement
time.Sleep(bo)
pe.retry(p, bo, serv)
sc.retry(p, bo, serv)
}
}
// Prepares to send the POST request
func (pe *ponyExpress) prepareWrite(points []byte, serv int) (*http.Response, time.Duration, error) {
func (sc *stressClient) prepareWrite(points []byte, serv int) (*http.Response, time.Duration, error) {
// Construct address string
var writeTemplate string
if pe.ssl {
if sc.ssl {
writeTemplate = "https://%v/write?db=%v&precision=%v&u=%v&p=%v"
} else {
writeTemplate = "http://%v/write?db=%v&precision=%v&u=%v&p=%v"
}
address := fmt.Sprintf(writeTemplate, pe.addresses[serv], pe.database, pe.precision, pe.username, pe.password)
address := fmt.Sprintf(writeTemplate, sc.addresses[serv], sc.database, sc.precision, sc.username, sc.password)
// Start timer
t := time.Now()

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"sync"

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"testing"

View File

@ -1,4 +1,4 @@
package ponyExpress
package stressClient
import (
"crypto/rand"

View File

@ -4,7 +4,7 @@ import "testing"
// Pulls the default configFile and makes sure it parses
func TestParseStatements(t *testing.T) {
stmts, err := ParseStatements("../file.iql")
stmts, err := ParseStatements("../iql/file.iql")
if err != nil {
t.Error(err)
}