258 lines
6.5 KiB
Go
258 lines
6.5 KiB
Go
package influxql_test
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/influxql"
|
|
)
|
|
|
|
var errUnexpected = errors.New("unexpected error")
|
|
|
|
type StatementExecutor struct {
|
|
ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error
|
|
}
|
|
|
|
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
return e.ExecuteStatementFn(stmt, ctx)
|
|
}
|
|
|
|
func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, database string) error {
|
|
return nil
|
|
}
|
|
|
|
func TestQueryExecutor_AttachQuery(t *testing.T) {
|
|
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
e := influxql.NewQueryExecutor()
|
|
e.StatementExecutor = &StatementExecutor{
|
|
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
if ctx.QueryID != 1 {
|
|
t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID)
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
|
|
discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
|
|
}
|
|
|
|
func TestQueryExecutor_KillQuery(t *testing.T) {
|
|
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
qid := make(chan uint64)
|
|
|
|
e := influxql.NewQueryExecutor()
|
|
e.StatementExecutor = &StatementExecutor{
|
|
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
qid <- ctx.QueryID
|
|
select {
|
|
case <-ctx.InterruptCh:
|
|
return influxql.ErrQueryInterrupted
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("killing the query did not close the channel after 100 milliseconds")
|
|
return errUnexpected
|
|
}
|
|
},
|
|
}
|
|
|
|
results := e.ExecuteQuery(q, "mydb", 100, nil)
|
|
q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
|
|
|
|
result := <-results
|
|
if result.Err != influxql.ErrQueryInterrupted {
|
|
t.Errorf("unexpected error: %s", result.Err)
|
|
}
|
|
}
|
|
|
|
func TestQueryExecutor_Interrupt(t *testing.T) {
|
|
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
e := influxql.NewQueryExecutor()
|
|
e.StatementExecutor = &StatementExecutor{
|
|
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
select {
|
|
case <-ctx.InterruptCh:
|
|
return influxql.ErrQueryInterrupted
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("killing the query did not close the channel after 100 milliseconds")
|
|
return errUnexpected
|
|
}
|
|
},
|
|
}
|
|
|
|
closing := make(chan struct{})
|
|
results := e.ExecuteQuery(q, "mydb", 100, closing)
|
|
close(closing)
|
|
result := <-results
|
|
if result.Err != influxql.ErrQueryInterrupted {
|
|
t.Errorf("unexpected error: %s", result.Err)
|
|
}
|
|
}
|
|
|
|
func TestQueryExecutor_ShowQueries(t *testing.T) {
|
|
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
e := influxql.NewQueryExecutor()
|
|
e.StatementExecutor = &StatementExecutor{
|
|
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
t.Errorf("unexpected statement: %s", stmt)
|
|
return errUnexpected
|
|
},
|
|
}
|
|
|
|
q, err = influxql.ParseQuery(`SHOW QUERIES`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
results := e.ExecuteQuery(q, "", 100, nil)
|
|
result := <-results
|
|
if len(result.Series) != 1 {
|
|
t.Errorf("expected %d rows, got %d", 1, len(result.Series))
|
|
}
|
|
if result.Err != nil {
|
|
t.Errorf("unexpected error: %s", result.Err)
|
|
}
|
|
}
|
|
|
|
func TestQueryExecutor_Limit_Timeout(t *testing.T) {
|
|
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
e := influxql.NewQueryExecutor()
|
|
e.StatementExecutor = &StatementExecutor{
|
|
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
select {
|
|
case <-ctx.InterruptCh:
|
|
return influxql.ErrQueryInterrupted
|
|
case <-time.After(time.Second):
|
|
t.Errorf("timeout has not killed the query")
|
|
return errUnexpected
|
|
}
|
|
},
|
|
}
|
|
e.QueryTimeout = time.Nanosecond
|
|
|
|
results := e.ExecuteQuery(q, "mydb", 100, nil)
|
|
result := <-results
|
|
if result.Err != influxql.ErrQueryTimeoutReached {
|
|
t.Errorf("unexpected error: %s", result.Err)
|
|
}
|
|
}
|
|
|
|
func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) {
|
|
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
qid := make(chan uint64)
|
|
|
|
e := influxql.NewQueryExecutor()
|
|
e.StatementExecutor = &StatementExecutor{
|
|
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
qid <- ctx.QueryID
|
|
<-ctx.InterruptCh
|
|
return influxql.ErrQueryInterrupted
|
|
},
|
|
}
|
|
e.MaxConcurrentQueries = 1
|
|
defer e.Close()
|
|
|
|
// Start first query and wait for it to be executing.
|
|
go discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
|
|
<-qid
|
|
|
|
// Start second query and expect for it to fail.
|
|
results := e.ExecuteQuery(q, "mydb", 100, nil)
|
|
|
|
select {
|
|
case result := <-results:
|
|
if len(result.Series) != 0 {
|
|
t.Errorf("expected %d rows, got %d", 0, len(result.Series))
|
|
}
|
|
if result.Err != influxql.ErrMaxConcurrentQueriesReached {
|
|
t.Errorf("unexpected error: %s", result.Err)
|
|
}
|
|
case <-qid:
|
|
t.Errorf("unexpected statement execution for the second query")
|
|
}
|
|
}
|
|
|
|
func TestQueryExecutor_Close(t *testing.T) {
|
|
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
ch1 := make(chan struct{})
|
|
ch2 := make(chan struct{})
|
|
|
|
e := influxql.NewQueryExecutor()
|
|
e.StatementExecutor = &StatementExecutor{
|
|
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
|
|
close(ch1)
|
|
<-ctx.InterruptCh
|
|
close(ch2)
|
|
return influxql.ErrQueryInterrupted
|
|
},
|
|
}
|
|
|
|
results := e.ExecuteQuery(q, "mydb", 100, nil)
|
|
go func(results <-chan *influxql.Result) {
|
|
result := <-results
|
|
if result.Err != influxql.ErrQueryEngineShutdown {
|
|
t.Errorf("unexpected error: %s", result.Err)
|
|
}
|
|
}(results)
|
|
|
|
// Wait for the statement to start executing.
|
|
<-ch1
|
|
|
|
// Close the query executor.
|
|
e.Close()
|
|
|
|
// Check that the statement gets interrupted and finishes.
|
|
select {
|
|
case <-ch2:
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("closing the query manager did not kill the query after 100 milliseconds")
|
|
}
|
|
|
|
results = e.ExecuteQuery(q, "mydb", 100, nil)
|
|
result := <-results
|
|
if len(result.Series) != 0 {
|
|
t.Errorf("expected %d rows, got %d", 0, len(result.Series))
|
|
}
|
|
if result.Err != influxql.ErrQueryEngineShutdown {
|
|
t.Errorf("unexpected error: %s", result.Err)
|
|
}
|
|
}
|
|
|
|
func discardOutput(results <-chan *influxql.Result) {
|
|
for range results {
|
|
// Read all results and discard.
|
|
}
|
|
}
|