Cleanup QueryExecutor and split statement execution code

The QueryExecutor had a lot of dead code made obsolete by the query
engine refactor that has now been removed. The TSDBStore interface has
also been cleaned up so we can have multiple implementations of this
(such as a local and remote version).

A StatementExecutor interface has been created for adding custom
functionality to the QueryExecutor that may not be available in the open
source version. The QueryExecutor delegate all statement execution to
the StatementExecutor and the QueryExecutor will only keep track of
housekeeping. Implementing additional queries is as simple as wrapping
the cluster.StatementExecutor struct or replacing it with something
completely different.

The PointsWriter in the QueryExecutor has been changed to a simple
interface that implements the one method needed by the query executor.
This is to allow different PointsWriter implementations to be used by
the QueryExecutor. It has also been moved into the StatementExecutor
instead.

The TSDBStore interface has now been modified to contain the code for
creating an IteratorCreator. This is so the underlying TSDBStore can
implement different ways of accessing the underlying shards rather than
always having to access each shard individually (such as batch
requests).

Remove the show servers handling. This isn't a valid command in the open
source version of InfluxDB anymore.

The QueryManager interface is now built into QueryExecutor and is no
longer necessary. The StatementExecutor and QueryExecutor split allows
task management to much more easily be built into QueryExecutor rather
than as a separate struct.
pull/6187/head
Jonathan A. Sternberg 2016-03-31 18:12:29 -04:00
parent fe635e6e47
commit 37b63cedec
15 changed files with 1036 additions and 1161 deletions

View File

@ -3,6 +3,7 @@ package cluster
import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/toml"
)
@ -16,10 +17,6 @@ const (
// DefaultShardMapperTimeout is the default timeout set on shard mappers.
DefaultShardMapperTimeout = 5 * time.Second
// DefaultQueryTimeout is the default timeout for executing a query.
// A value of zero will have no query timeout.
DefaultQueryTimeout = time.Duration(0)
// DefaultMaxRemoteWriteConnections is the maximum number of open connections
// that will be available for remote writes to another host.
DefaultMaxRemoteWriteConnections = 3
@ -57,7 +54,7 @@ func NewConfig() Config {
WriteTimeout: toml.Duration(DefaultWriteTimeout),
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout),
QueryTimeout: toml.Duration(DefaultQueryTimeout),
QueryTimeout: toml.Duration(influxql.DefaultQueryTimeout),
MaxRemoteWriteConnections: DefaultMaxRemoteWriteConnections,
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
MaxSelectPointN: DefaultMaxSelectPointN,

View File

@ -3,6 +3,7 @@ package cluster
import (
"encoding"
"encoding/binary"
"errors"
"expvar"
"fmt"
"io"
@ -325,10 +326,15 @@ func (s *Service) processCreateIteratorRequest(conn net.Conn) {
return err
}
sh, ok := s.TSDBStore.(ShardIteratorCreator)
if !ok {
return errors.New("unable to access a specific shard with this tsdb store")
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
ic := sh.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
@ -394,10 +400,15 @@ func (s *Service) processFieldDimensionsRequest(conn net.Conn) {
return err
}
sh, ok := s.TSDBStore.(ShardIteratorCreator)
if !ok {
return errors.New("unable to access a specific shard with this tsdb store")
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
ic := sh.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
@ -437,10 +448,15 @@ func (s *Service) processSeriesKeysRequest(conn net.Conn) {
return err
}
sh, ok := s.TSDBStore.(ShardIteratorCreator)
if !ok {
return errors.New("unable to access a specific shard with this tsdb store")
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
ic := sh.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}

View File

@ -3,11 +3,7 @@ package cluster
import (
"bytes"
"errors"
"expvar"
"fmt"
"io"
"io/ioutil"
"log"
"sort"
"strconv"
"time"
@ -19,12 +15,8 @@ import (
"github.com/influxdata/influxdb/services/meta"
)
// A QueryExecutor is responsible for processing a influxql.Query and
// executing all of the statements within, on nodes in a cluster.
type QueryExecutor struct {
// Reference to local node.
Node *influxdb.Node
// StatementExecutor executes a statement in the query.
type StatementExecutor struct {
MetaClient MetaClient
// TSDB storage for local node.
@ -34,223 +26,99 @@ type QueryExecutor struct {
Monitor *monitor.Monitor
// Used for rewriting points back into system for SELECT INTO statements.
PointsWriter *PointsWriter
// Used for managing and tracking running queries.
QueryManager influxql.QueryManager
// Query execution timeout.
QueryTimeout time.Duration
PointsWriter interface {
WritePointsInto(*IntoWriteRequest) error
}
// Select statement limits
MaxSelectPointN int
MaxSelectSeriesN int
MaxSelectBucketsN int
// Remote execution timeout
Timeout time.Duration
// Output of all logging.
// Defaults to discarding all log output.
LogOutput io.Writer
// expvar-based stats.
statMap *expvar.Map
}
// Statistics for the QueryExecutor
const (
statQueriesActive = "queriesActive" // Number of queries currently being executed
statQueryExecutionDuration = "queryDurationNs" // Total (wall) time spent executing queries
)
// NewQueryExecutor returns a new instance of QueryExecutor.
func NewQueryExecutor() *QueryExecutor {
return &QueryExecutor{
Timeout: DefaultShardMapperTimeout,
QueryTimeout: DefaultQueryTimeout,
LogOutput: ioutil.Discard,
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
// Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
return e.executeSelectStatement(stmt, ctx)
}
var rows models.Rows
var err error
switch stmt := stmt.(type) {
case *influxql.AlterRetentionPolicyStatement:
err = e.executeAlterRetentionPolicyStatement(stmt)
case *influxql.CreateContinuousQueryStatement:
err = e.executeCreateContinuousQueryStatement(stmt)
case *influxql.CreateDatabaseStatement:
err = e.executeCreateDatabaseStatement(stmt)
case *influxql.CreateRetentionPolicyStatement:
err = e.executeCreateRetentionPolicyStatement(stmt)
case *influxql.CreateSubscriptionStatement:
err = e.executeCreateSubscriptionStatement(stmt)
case *influxql.CreateUserStatement:
err = e.executeCreateUserStatement(stmt)
case *influxql.DropContinuousQueryStatement:
err = e.executeDropContinuousQueryStatement(stmt)
case *influxql.DropDatabaseStatement:
err = e.executeDropDatabaseStatement(stmt)
case *influxql.DropMeasurementStatement:
err = e.executeDropMeasurementStatement(stmt, ctx.Database)
case *influxql.DropSeriesStatement:
err = e.executeDropSeriesStatement(stmt, ctx.Database)
case *influxql.DropRetentionPolicyStatement:
err = e.executeDropRetentionPolicyStatement(stmt)
case *influxql.DropShardStatement:
err = e.executeDropShardStatement(stmt)
case *influxql.DropSubscriptionStatement:
err = e.executeDropSubscriptionStatement(stmt)
case *influxql.DropUserStatement:
err = e.executeDropUserStatement(stmt)
case *influxql.GrantStatement:
err = e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
err = e.executeGrantAdminStatement(stmt)
case *influxql.RevokeStatement:
err = e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
err = e.executeRevokeAdminStatement(stmt)
case *influxql.ShowContinuousQueriesStatement:
rows, err = e.executeShowContinuousQueriesStatement(stmt)
case *influxql.ShowDatabasesStatement:
rows, err = e.executeShowDatabasesStatement(stmt)
case *influxql.ShowDiagnosticsStatement:
rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowShardsStatement:
rows, err = e.executeShowShardsStatement(stmt)
case *influxql.ShowShardGroupsStatement:
rows, err = e.executeShowShardGroupsStatement(stmt)
case *influxql.ShowStatsStatement:
rows, err = e.executeShowStatsStatement(stmt)
case *influxql.ShowSubscriptionsStatement:
rows, err = e.executeShowSubscriptionsStatement(stmt)
case *influxql.ShowUsersStatement:
rows, err = e.executeShowUsersStatement(stmt)
case *influxql.SetPasswordUserStatement:
err = e.executeSetPasswordUserStatement(stmt)
default:
return influxql.ErrInvalidQuery
}
if err != nil {
return err
}
ctx.Results <- &influxql.Result{
StatementID: ctx.StatementID,
Series: rows,
}
return nil
}
// ExecuteQuery executes each statement within a query.
func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
results := make(chan *influxql.Result)
go e.executeQuery(query, database, chunkSize, closing, results)
return results
}
func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chunkSize int, closing <-chan struct{}, results chan *influxql.Result) {
defer close(results)
e.statMap.Add(statQueriesActive, 1)
defer func(start time.Time) {
e.statMap.Add(statQueriesActive, -1)
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now())
qerr := &influxql.QueryError{}
var qid uint64
if e.QueryManager != nil {
var err error
qid, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
Query: query,
Database: database,
Timeout: e.QueryTimeout,
InterruptCh: closing,
Error: qerr,
})
if err != nil {
results <- &influxql.Result{Err: err}
return
}
defer e.QueryManager.KillQuery(qid)
}
logger := e.logger()
var i int
for ; i < len(query.Statements); i++ {
stmt := query.Statements[i]
// If a default database wasn't passed in by the caller, check the statement.
defaultDB := database
if defaultDB == "" {
if s, ok := stmt.(influxql.HasDefaultDatabase); ok {
defaultDB = s.DefaultDatabase()
}
}
// Rewrite statements, if necessary.
// This can occur on meta read statements which convert to SELECT statements.
newStmt, err := influxql.RewriteStatement(stmt)
if err != nil {
results <- &influxql.Result{Err: err}
break
}
stmt = newStmt
// Normalize each statement.
if err := e.normalizeStatement(stmt, defaultDB); err != nil {
results <- &influxql.Result{Err: err}
break
}
// Log each normalized statement.
logger.Println(stmt.String())
// Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
if err := e.executeSelectStatement(stmt, chunkSize, i, qid, results, closing); err != nil {
if err == influxql.ErrQueryInterrupted {
err = qerr.Error()
}
results <- &influxql.Result{StatementID: i, Err: err}
break
}
continue
}
var rows models.Rows
switch stmt := stmt.(type) {
case *influxql.AlterRetentionPolicyStatement:
err = e.executeAlterRetentionPolicyStatement(stmt)
case *influxql.CreateContinuousQueryStatement:
err = e.executeCreateContinuousQueryStatement(stmt)
case *influxql.CreateDatabaseStatement:
err = e.executeCreateDatabaseStatement(stmt)
case *influxql.CreateRetentionPolicyStatement:
err = e.executeCreateRetentionPolicyStatement(stmt)
case *influxql.CreateSubscriptionStatement:
err = e.executeCreateSubscriptionStatement(stmt)
case *influxql.CreateUserStatement:
err = e.executeCreateUserStatement(stmt)
case *influxql.DropContinuousQueryStatement:
err = e.executeDropContinuousQueryStatement(stmt)
case *influxql.DropDatabaseStatement:
err = e.executeDropDatabaseStatement(stmt)
case *influxql.DropMeasurementStatement:
err = e.executeDropMeasurementStatement(stmt, database)
case *influxql.DropSeriesStatement:
err = e.executeDropSeriesStatement(stmt, database)
case *influxql.DropRetentionPolicyStatement:
err = e.executeDropRetentionPolicyStatement(stmt)
case *influxql.DropServerStatement:
err = influxql.ErrInvalidQuery
case *influxql.DropShardStatement:
err = e.executeDropShardStatement(stmt)
case *influxql.DropSubscriptionStatement:
err = e.executeDropSubscriptionStatement(stmt)
case *influxql.DropUserStatement:
err = e.executeDropUserStatement(stmt)
case *influxql.GrantStatement:
err = e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
err = e.executeGrantAdminStatement(stmt)
case *influxql.KillQueryStatement:
err = e.executeKillQueryStatement(stmt)
case *influxql.RevokeStatement:
err = e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
err = e.executeRevokeAdminStatement(stmt)
case *influxql.ShowContinuousQueriesStatement:
rows, err = e.executeShowContinuousQueriesStatement(stmt)
case *influxql.ShowDatabasesStatement:
rows, err = e.executeShowDatabasesStatement(stmt)
case *influxql.ShowDiagnosticsStatement:
rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowQueriesStatement:
rows, err = e.executeShowQueriesStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowServersStatement:
// TODO: corylanou add this back for single node
err = influxql.ErrInvalidQuery
case *influxql.ShowShardsStatement:
rows, err = e.executeShowShardsStatement(stmt)
case *influxql.ShowShardGroupsStatement:
rows, err = e.executeShowShardGroupsStatement(stmt)
case *influxql.ShowStatsStatement:
rows, err = e.executeShowStatsStatement(stmt)
case *influxql.ShowSubscriptionsStatement:
rows, err = e.executeShowSubscriptionsStatement(stmt)
case *influxql.ShowTagValuesStatement:
rows, err = e.executeShowTagValuesStatement(stmt, database)
case *influxql.ShowUsersStatement:
rows, err = e.executeShowUsersStatement(stmt)
case *influxql.SetPasswordUserStatement:
err = e.executeSetPasswordUserStatement(stmt)
default:
err = influxql.ErrInvalidQuery
}
// Send results for each statement.
results <- &influxql.Result{
StatementID: i,
Series: rows,
Err: err,
}
// Stop after the first error.
if err != nil {
break
}
}
// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
results <- &influxql.Result{
StatementID: i,
Err: influxql.ErrNotExecuted,
}
}
}
func (e *QueryExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
rpu := &meta.RetentionPolicyUpdate{
Duration: stmt.Duration,
ReplicaN: stmt.Replication,
@ -272,11 +140,11 @@ func (e *QueryExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.Alte
return nil
}
func (e *QueryExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) error {
func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) error {
return e.MetaClient.CreateContinuousQuery(q.Database, q.Name, q.String())
}
func (e *QueryExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {
func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {
if !stmt.RetentionPolicyCreate {
_, err := e.MetaClient.CreateDatabase(stmt.Name)
return err
@ -290,7 +158,7 @@ func (e *QueryExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateData
return err
}
func (e *QueryExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error {
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error {
rpi := meta.NewRetentionPolicyInfo(stmt.Name)
rpi.Duration = stmt.Duration
rpi.ReplicaN = stmt.Replication
@ -310,23 +178,23 @@ func (e *QueryExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.Cre
return nil
}
func (e *QueryExecutor) executeCreateSubscriptionStatement(q *influxql.CreateSubscriptionStatement) error {
func (e *StatementExecutor) executeCreateSubscriptionStatement(q *influxql.CreateSubscriptionStatement) error {
return e.MetaClient.CreateSubscription(q.Database, q.RetentionPolicy, q.Name, q.Mode, q.Destinations)
}
func (e *QueryExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) error {
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) error {
_, err := e.MetaClient.CreateUser(q.Name, q.Password, q.Admin)
return err
}
func (e *QueryExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error {
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error {
return e.MetaClient.DropContinuousQuery(q.Database, q.Name)
}
// executeDropDatabaseStatement drops a database from the cluster.
// It does not return an error if the database was not found on any of
// the nodes, or in the Meta store.
func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabaseStatement) error {
func (e *StatementExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabaseStatement) error {
// Remove the database from the Meta Store.
if err := e.MetaClient.DropDatabase(stmt.Name); err != nil {
return err
@ -336,7 +204,7 @@ func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabase
return e.TSDBStore.DeleteDatabase(stmt.Name)
}
func (e *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error {
func (e *StatementExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error {
if dbi, err := e.MetaClient.Database(database); err != nil {
return err
} else if dbi == nil {
@ -347,7 +215,7 @@ func (e *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasu
return e.TSDBStore.DeleteMeasurement(database, stmt.Name)
}
func (e *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) error {
func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) error {
if dbi, err := e.MetaClient.Database(database); err != nil {
return err
} else if dbi == nil {
@ -363,7 +231,7 @@ func (e *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
}
func (e *QueryExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
// Remove the shard reference from the Meta Store.
if err := e.MetaClient.DropShard(stmt.ID); err != nil {
return err
@ -373,7 +241,7 @@ func (e *QueryExecutor) executeDropShardStatement(stmt *influxql.DropShardStatem
return e.TSDBStore.DeleteShard(stmt.ID)
}
func (e *QueryExecutor) executeDropRetentionPolicyStatement(stmt *influxql.DropRetentionPolicyStatement) error {
func (e *StatementExecutor) executeDropRetentionPolicyStatement(stmt *influxql.DropRetentionPolicyStatement) error {
if err := e.MetaClient.DropRetentionPolicy(stmt.Database, stmt.Name); err != nil {
return err
}
@ -382,30 +250,23 @@ func (e *QueryExecutor) executeDropRetentionPolicyStatement(stmt *influxql.DropR
return e.TSDBStore.DeleteRetentionPolicy(stmt.Database, stmt.Name)
}
func (e *QueryExecutor) executeDropSubscriptionStatement(q *influxql.DropSubscriptionStatement) error {
func (e *StatementExecutor) executeDropSubscriptionStatement(q *influxql.DropSubscriptionStatement) error {
return e.MetaClient.DropSubscription(q.Database, q.RetentionPolicy, q.Name)
}
func (e *QueryExecutor) executeDropUserStatement(q *influxql.DropUserStatement) error {
func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) error {
return e.MetaClient.DropUser(q.Name)
}
func (e *QueryExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
return e.MetaClient.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)
}
func (e *QueryExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStatement) error {
func (e *StatementExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStatement) error {
return e.MetaClient.SetAdminPrivilege(stmt.User, true)
}
func (e *QueryExecutor) executeKillQueryStatement(stmt *influxql.KillQueryStatement) error {
if e.QueryManager == nil {
return influxql.ErrNoQueryManager
}
return e.QueryManager.KillQuery(stmt.QueryID)
}
func (e *QueryExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
priv := influxql.NoPrivileges
// Revoking all privileges means there's no need to look at existing user privileges.
@ -421,18 +282,18 @@ func (e *QueryExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) e
return e.MetaClient.SetPrivilege(stmt.User, stmt.On, priv)
}
func (e *QueryExecutor) executeRevokeAdminStatement(stmt *influxql.RevokeAdminStatement) error {
func (e *StatementExecutor) executeRevokeAdminStatement(stmt *influxql.RevokeAdminStatement) error {
return e.MetaClient.SetAdminPrivilege(stmt.User, false)
}
func (e *QueryExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) error {
func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) error {
return e.MetaClient.UpdateUser(q.Name, q.Password)
}
func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, chunkSize, statementID int, qid uint64, results chan *influxql.Result, closing <-chan struct{}) error {
func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{InterruptCh: closing}
opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh}
// Replace instances of "now()" with the current time, and check the resultant times.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
@ -502,13 +363,13 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
return err
}
if qid != 0 && e.MaxSelectPointN > 0 {
if e.MaxSelectPointN > 0 {
monitor := influxql.PointLimitMonitor(itrs, influxql.DefaultStatsInterval, e.MaxSelectPointN)
e.QueryManager.MonitorQuery(qid, monitor)
ctx.Query.Monitor(monitor)
}
// Generate a row emitter from the iterator set.
em := influxql.NewEmitter(itrs, stmt.TimeAscending(), chunkSize)
em := influxql.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
em.Columns = stmt.ColumnNames()
em.OmitTime = stmt.OmitTime
defer em.Close()
@ -527,7 +388,7 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
if row == nil {
// Check if the query was interrupted while emitting.
select {
case <-closing:
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
default:
}
@ -535,7 +396,7 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
}
result := &influxql.Result{
StatementID: statementID,
StatementID: ctx.StatementID,
Series: []*models.Row{row},
}
@ -550,9 +411,9 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
// Send results or exit if closing.
select {
case <-closing:
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case results <- result:
case ctx.Results <- result:
}
emitted = true
@ -560,8 +421,8 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
// Emit write count if an INTO statement.
if stmt.Target != nil {
results <- &influxql.Result{
StatementID: statementID,
ctx.Results <- &influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{{
Name: "result",
Columns: []string{"time", "written"},
@ -573,8 +434,8 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
// Always emit at least one result.
if !emitted {
results <- &influxql.Result{
StatementID: statementID,
ctx.Results <- &influxql.Result{
StatementID: ctx.StatementID,
Series: make([]*models.Row, 0),
}
}
@ -583,34 +444,21 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
}
// iteratorCreator returns a new instance of IteratorCreator based on stmt.
func (e *QueryExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
// Retrieve a list of shard IDs.
shards, err := e.MetaClient.ShardsByTimeRange(stmt.Sources, opt.MinTime, opt.MaxTime)
if err != nil {
return nil, err
}
// Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0)
if err := func() error {
for _, shard := range shards {
ic := e.TSDBStore.ShardIteratorCreator(shard.ID)
if ic == nil {
continue
}
ics = append(ics, ic)
}
return nil
}(); err != nil {
influxql.IteratorCreators(ics).Close()
return nil, err
shardIDs := make([]uint64, len(shards))
for i, sh := range shards {
shardIDs[i] = sh.ID
}
return influxql.IteratorCreators(ics), nil
return e.TSDBStore.IteratorCreator(shardIDs)
}
func (e *QueryExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
dis, err := e.MetaClient.Databases()
if err != nil {
return nil, err
@ -627,7 +475,7 @@ func (e *QueryExecutor) executeShowContinuousQueriesStatement(stmt *influxql.Sho
return rows, nil
}
func (e *QueryExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) (models.Rows, error) {
dis, err := e.MetaClient.Databases()
if err != nil {
return nil, err
@ -640,7 +488,7 @@ func (e *QueryExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesS
return []*models.Row{row}, nil
}
func (e *QueryExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) (models.Rows, error) {
diags, err := e.Monitor.Diagnostics()
if err != nil {
return nil, err
@ -668,12 +516,7 @@ func (e *QueryExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagn
return rows, nil
}
func (e *QueryExecutor) executeShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) {
// FIXME(benbjohnson): Rewrite to use new query engine.
return e.TSDBStore.ExecuteShowFieldKeysStatement(stmt, database)
}
func (e *QueryExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) (models.Rows, error) {
priv, err := e.MetaClient.UserPrivileges(q.Name)
if err != nil {
return nil, err
@ -686,11 +529,7 @@ func (e *QueryExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrants
return []*models.Row{row}, nil
}
func (e *QueryExecutor) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) {
return influxql.ExecuteShowQueriesStatement(e.QueryManager, q)
}
func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
di, err := e.MetaClient.Database(q.Database)
if err != nil {
return nil, err
@ -705,7 +544,7 @@ func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRe
return []*models.Row{row}, nil
}
func (e *QueryExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) {
dis, err := e.MetaClient.Databases()
if err != nil {
return nil, err
@ -746,7 +585,7 @@ func (e *QueryExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStat
return rows, nil
}
func (e *QueryExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) {
dis, err := e.MetaClient.Databases()
if err != nil {
return nil, err
@ -777,7 +616,7 @@ func (e *QueryExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShard
return []*models.Row{row}, nil
}
func (e *QueryExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) {
stats, err := e.Monitor.Statistics(nil)
if err != nil {
return nil, err
@ -801,7 +640,7 @@ func (e *QueryExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatem
return rows, nil
}
func (e *QueryExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) {
dis, err := e.MetaClient.Databases()
if err != nil {
return nil, err
@ -822,11 +661,7 @@ func (e *QueryExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSub
return rows, nil
}
func (e *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) {
return e.TSDBStore.ExecuteShowTagValuesStatement(stmt, database)
}
func (e *QueryExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) (models.Rows, error) {
func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) (models.Rows, error) {
row := &models.Row{Columns: []string{"user", "admin"}}
for _, ui := range e.MetaClient.Users() {
row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin})
@ -834,11 +669,7 @@ func (e *QueryExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement
return []*models.Row{row}, nil
}
func (e *QueryExecutor) logger() *log.Logger {
return log.New(e.LogOutput, "[query] ", log.LstdFlags)
}
func (e *QueryExecutor) writeInto(stmt *influxql.SelectStatement, row *models.Row) error {
func (e *StatementExecutor) writeInto(stmt *influxql.SelectStatement, row *models.Row) error {
if stmt.Target.Measurement.Database == "" {
return errNoDatabaseInTarget
}
@ -912,8 +743,8 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point
return points, nil
}
// normalizeStatement adds a default database and policy to the measurements in statement.
func (e *QueryExecutor) normalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error) {
// NormalizeStatement adds a default database and policy to the measurements in statement.
func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error) {
influxql.WalkFunc(stmt, func(node influxql.Node) {
if err != nil {
return
@ -930,7 +761,7 @@ func (e *QueryExecutor) normalizeStatement(stmt influxql.Statement, defaultDatab
return
}
func (e *QueryExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error {
func (e *StatementExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error {
// Targets (measurements in an INTO clause) can have blank names, which means it will be
// the same as the measurement name it came from in the FROM clause.
if !m.IsTarget && m.Name == "" && m.Regex == nil {
@ -983,9 +814,11 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
IteratorCreator(shards []uint64) (influxql.IteratorCreator, error)
}
// ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.
type ShardIteratorCreator interface {
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
@ -80,7 +79,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
// Ensure query executor can enforce a maximum series selection count.
func TestQueryExecutor_ExecuteQuery_MaxSelectSeriesN(t *testing.T) {
e := DefaultQueryExecutor()
e.MaxSelectSeriesN = 3
e.StatementExecutor.MaxSelectSeriesN = 3
// The meta client should return a two shards on the local node.
e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
@ -123,7 +122,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectSeriesN(t *testing.T) {
// Ensure query executor can enforce a maximum bucket selection count.
func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
e := DefaultQueryExecutor()
e.MaxSelectBucketsN = 3
e.StatementExecutor.MaxSelectBucketsN = 3
// The meta client should return a single shards on the local node.
e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
@ -161,22 +160,25 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
// QueryExecutor is a test wrapper for cluster.QueryExecutor.
type QueryExecutor struct {
*cluster.QueryExecutor
*influxql.QueryExecutor
MetaClient MetaClient
TSDBStore TSDBStore
LogOutput bytes.Buffer
MetaClient MetaClient
TSDBStore TSDBStore
StatementExecutor *cluster.StatementExecutor
LogOutput bytes.Buffer
}
// NewQueryExecutor returns a new instance of QueryExecutor.
// This query executor always has a node id of 0.
func NewQueryExecutor() *QueryExecutor {
e := &QueryExecutor{
QueryExecutor: cluster.NewQueryExecutor(),
QueryExecutor: influxql.NewQueryExecutor(),
}
e.Node = &influxdb.Node{ID: 0}
e.QueryExecutor.MetaClient = &e.MetaClient
e.QueryExecutor.TSDBStore = &e.TSDBStore
e.StatementExecutor = &cluster.StatementExecutor{
MetaClient: &e.MetaClient,
TSDBStore: &e.TSDBStore,
}
e.QueryExecutor.StatementExecutor = e.StatementExecutor
e.QueryExecutor.LogOutput = &e.LogOutput
if testing.Verbose() {
@ -190,7 +192,6 @@ func NewQueryExecutor() *QueryExecutor {
func DefaultQueryExecutor() *QueryExecutor {
e := NewQueryExecutor()
e.MetaClient.DatabaseFn = DefaultMetaClientDatabaseFn
e.TSDBStore.ExpandSourcesFn = DefaultTSDBStoreExpandSourcesFn
return e
}
@ -204,15 +205,12 @@ type TSDBStore struct {
CreateShardFn func(database, policy string, shardID uint64) error
WriteToShardFn func(shardID uint64, points []models.Point) error
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatementFn func(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatementFn func(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}
func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error {
@ -246,31 +244,31 @@ func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, con
return s.DeleteSeriesFn(database, sources, condition)
}
func (s *TSDBStore) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) {
return s.ExecuteShowFieldKeysStatementFn(stmt, database)
}
func (s *TSDBStore) IteratorCreator(shards []uint64) (influxql.IteratorCreator, error) {
// Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0)
if err := func() error {
for _, id := range shards {
ic := s.ShardIteratorCreator(id)
if ic == nil {
continue
}
ics = append(ics, ic)
}
func (s *TSDBStore) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) {
return s.ExecuteShowTagValuesStatementFn(stmt, database)
}
return nil
}(); err != nil {
influxql.IteratorCreators(ics).Close()
return nil, err
}
func (s *TSDBStore) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
return s.ExpandSourcesFn(sources)
return influxql.IteratorCreators(ics), nil
}
func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator {
return s.ShardIteratorCreatorFn(id)
}
// DefaultTSDBStoreExpandSourcesFn expands a single source using the default database & retention policy.
func DefaultTSDBStoreExpandSourcesFn(sources influxql.Sources) (influxql.Sources, error) {
return influxql.Sources{&influxql.Measurement{
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
Name: sources[0].(*influxql.Measurement).Name},
}, nil
}
// MustParseQuery parses s into a query. Panic on error.
func MustParseQuery(s string) *influxql.Query {
q, err := influxql.ParseQuery(s)

View File

@ -55,7 +55,7 @@ type Server struct {
MetaClient *meta.Client
TSDBStore *tsdb.Store
QueryExecutor *cluster.QueryExecutor
QueryExecutor *influxql.QueryExecutor
PointsWriter *cluster.PointsWriter
Subscriber *subscriber.Service
@ -166,16 +166,18 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.PointsWriter.Subscriber = s.Subscriber
// Initialize query executor.
s.QueryExecutor = cluster.NewQueryExecutor()
s.QueryExecutor.MetaClient = s.MetaClient
s.QueryExecutor.TSDBStore = s.TSDBStore
s.QueryExecutor.Monitor = s.Monitor
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor = influxql.NewQueryExecutor()
s.QueryExecutor.StatementExecutor = &cluster.StatementExecutor{
MetaClient: s.MetaClient,
TSDBStore: s.TSDBStore,
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Cluster.MaxSelectPointN,
MaxSelectSeriesN: c.Cluster.MaxSelectSeriesN,
MaxSelectBucketsN: c.Cluster.MaxSelectBucketsN,
}
s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout)
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager(c.Cluster.MaxConcurrentQueries)
s.QueryExecutor.MaxSelectPointN = c.Cluster.MaxSelectPointN
s.QueryExecutor.MaxSelectSeriesN = c.Cluster.MaxSelectSeriesN
s.QueryExecutor.MaxSelectBucketsN = c.Cluster.MaxSelectBucketsN
s.QueryExecutor.MaxConcurrentQueries = c.Cluster.MaxConcurrentQueries
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}
@ -319,8 +321,8 @@ func (s *Server) Close() error {
s.PointsWriter.Close()
}
if s.QueryExecutor.QueryManager != nil {
s.QueryExecutor.QueryManager.Close()
if s.QueryExecutor != nil {
s.QueryExecutor.Close()
}
// Close the TSDBStore, no more reads or writes at this point

View File

@ -2215,9 +2215,6 @@ func (s *DropServerStatement) RequiredPrivileges() ExecutionPrivileges {
type DropShardStatement struct {
// ID of the shard to be dropped.
ID uint64
// Meta indicates if the server being dropped is a meta or data node
Meta bool
}
// String returns a string representation of the drop series statement.

View File

@ -2,13 +2,17 @@ package influxql
import (
"errors"
"expvar"
"fmt"
)
"io"
"io/ioutil"
"log"
"sync"
"time"
// QueryExecutor executes every statement in an Query.
type QueryExecutor interface {
ExecuteQuery(query *Query, database string, chunkSize int, closing chan struct{}) <-chan *Result
}
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
)
var (
// ErrInvalidQuery is returned when executing an unknown query type.
@ -17,6 +21,36 @@ var (
// ErrNotExecuted is returned when a statement is not executed in a query.
// This can occur when a previous statement in the same query has errored.
ErrNotExecuted = errors.New("not executed")
// ErrQueryInterrupted is an error returned when the query is interrupted.
ErrQueryInterrupted = errors.New("query interrupted")
// ErrMaxConcurrentQueriesReached is an error when a query cannot be run
// because the maximum number of queries has been reached.
ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached")
// ErrQueryEngineShutdown is an error sent when the query cannot be
// created because the query engine was shutdown.
ErrQueryEngineShutdown = errors.New("query engine shutdown")
// ErrMaxPointsReached is an error when a query hits the maximum number of
// points.
ErrMaxPointsReached = errors.New("max number of points reached")
// ErrQueryTimeoutReached is an error when a query hits the timeout.
ErrQueryTimeoutReached = errors.New("query timeout reached")
)
const (
// DefaultQueryTimeout is the default timeout for executing a query.
// A value of zero will have no query timeout.
DefaultQueryTimeout = time.Duration(0)
)
// Statistics for the QueryExecutor
const (
statQueriesActive = "queriesActive" // Number of queries currently being executed
statQueryExecutionDuration = "queryDurationNs" // Total (wall) time spent executing queries
)
// ErrDatabaseNotFound returns a database not found error for the given database name.
@ -24,3 +58,391 @@ func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not fo
// ErrMeasurementNotFound returns a measurement not found error for the given measurement name.
func ErrMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) }
// ExecutionContext contains state that the query is currently executing with.
type ExecutionContext struct {
// The statement ID of the executing query.
StatementID int
// The query ID of the executing query.
QueryID uint64
// The query task information available to the StatementExecutor.
Query *QueryTask
// Output channel where results and errors should be sent.
Results chan *Result
// The database the query is running against.
Database string
// The requested maximum number of points to return in each result.
ChunkSize int
// A channel that is closed when the query is interrupted.
InterruptCh <-chan struct{}
}
// StatementExecutor executes a statement within the QueryExecutor.
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the
// results channel in the ExecutionContext.
ExecuteStatement(stmt Statement, ctx *ExecutionContext) error
// NormalizeStatement adds a default database and policy to the
// measurements in the statement.
NormalizeStatement(stmt Statement, database string) error
}
// QueryExecutor executes every statement in an Query.
type QueryExecutor struct {
// Used for executing a statement in the query.
StatementExecutor StatementExecutor
// Query execution timeout.
QueryTimeout time.Duration
// Maximum number of concurrent queries.
MaxConcurrentQueries int
// Output of all logging.
// Defaults to discarding all log output.
LogOutput io.Writer
// Used for managing and tracking running queries.
queries map[uint64]*QueryTask
nextID uint64
mu sync.RWMutex
shutdown bool
// expvar-based stats.
statMap *expvar.Map
}
// NewQueryExecutor returns a new instance of QueryExecutor.
func NewQueryExecutor() *QueryExecutor {
return &QueryExecutor{
QueryTimeout: DefaultQueryTimeout,
LogOutput: ioutil.Discard,
queries: make(map[uint64]*QueryTask),
nextID: 1,
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
}
}
// Close kills all running queries and prevents new queries from being attached.
func (e *QueryExecutor) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
e.shutdown = true
for _, query := range e.queries {
query.setError(ErrQueryEngineShutdown)
close(query.closing)
}
e.queries = nil
return nil
}
// ExecuteQuery executes each statement within a query.
func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize int, closing chan struct{}) <-chan *Result {
results := make(chan *Result)
go e.executeQuery(query, database, chunkSize, closing, results)
return results
}
func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize int, closing <-chan struct{}, results chan *Result) {
defer close(results)
e.statMap.Add(statQueriesActive, 1)
defer func(start time.Time) {
e.statMap.Add(statQueriesActive, -1)
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
}(time.Now())
qid, task, err := e.attachQuery(query, database, closing)
if err != nil {
results <- &Result{Err: err}
return
}
defer e.killQuery(qid)
logger := e.logger()
// Setup the execution context that will be used when executing statements.
ctx := ExecutionContext{
QueryID: qid,
Query: task,
Results: results,
Database: database,
ChunkSize: chunkSize,
InterruptCh: task.closing,
}
var i int
loop:
for ; i < len(query.Statements); i++ {
ctx.StatementID = i
stmt := query.Statements[i]
// If a default database wasn't passed in by the caller, check the statement.
defaultDB := database
if defaultDB == "" {
if s, ok := stmt.(HasDefaultDatabase); ok {
defaultDB = s.DefaultDatabase()
}
}
// Rewrite statements, if necessary.
// This can occur on meta read statements which convert to SELECT statements.
newStmt, err := RewriteStatement(stmt)
if err != nil {
results <- &Result{Err: err}
break
}
stmt = newStmt
// Normalize each statement.
if err := e.StatementExecutor.NormalizeStatement(stmt, defaultDB); err != nil {
results <- &Result{Err: err}
break
}
// Log each normalized statement.
logger.Println(stmt.String())
// Handle a query management queries specially so they don't go
// to the underlying statement executor.
switch stmt := stmt.(type) {
case *ShowQueriesStatement:
rows, err := e.executeShowQueriesStatement(stmt)
results <- &Result{
StatementID: i,
Series: rows,
Err: err,
}
if err != nil {
break loop
}
continue loop
case *KillQueryStatement:
err := e.executeKillQueryStatement(stmt)
results <- &Result{
StatementID: i,
Err: err,
}
if err != nil {
break loop
}
continue loop
}
// Send any other statements to the underlying statement executor.
err = e.StatementExecutor.ExecuteStatement(stmt, &ctx)
if err == ErrQueryInterrupted {
// Query was interrupted so retrieve the real interrupt error from
// the query task if there is one.
if qerr := task.Error(); qerr != nil {
err = qerr
}
}
// Send an error for this result if it failed for some reason.
if err != nil {
results <- &Result{
StatementID: i,
Err: err,
}
// Stop after the first error.
break
}
}
// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
results <- &Result{
StatementID: i,
Err: ErrNotExecuted,
}
}
}
func (e *QueryExecutor) executeKillQueryStatement(stmt *KillQueryStatement) error {
return e.killQuery(stmt.QueryID)
}
func (e *QueryExecutor) executeShowQueriesStatement(q *ShowQueriesStatement) (models.Rows, error) {
e.mu.RLock()
defer e.mu.RUnlock()
now := time.Now()
values := make([][]interface{}, 0, len(e.queries))
for id, qi := range e.queries {
d := now.Sub(qi.startTime)
var ds string
if d == 0 {
ds = "0s"
} else if d < time.Second {
ds = fmt.Sprintf("%du", d)
} else {
ds = (d - (d % time.Second)).String()
}
values = append(values, []interface{}{id, qi.query, qi.database, ds})
}
return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration"},
Values: values,
}}, nil
}
func (e *QueryExecutor) logger() *log.Logger {
return log.New(e.LogOutput, "[query] ", log.LstdFlags)
}
func (e *QueryExecutor) query(qid uint64) (*QueryTask, bool) {
e.mu.RLock()
query, ok := e.queries[qid]
e.mu.RUnlock()
return query, ok
}
// attachQuery attaches a running query to be managed by the QueryExecutor.
// Returns the query id of the newly attached query or an error if it was
// unable to assign a query id or attach the query to the QueryExecutor.
// This function also returns a channel that will be closed when this
// query finishes running.
//
// After a query finishes running, the system is free to reuse a query id.
func (e *QueryExecutor) attachQuery(q *Query, database string, interrupt <-chan struct{}) (uint64, *QueryTask, error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.shutdown {
return 0, nil, ErrQueryEngineShutdown
}
if e.MaxConcurrentQueries > 0 && len(e.queries) >= e.MaxConcurrentQueries {
return 0, nil, ErrMaxConcurrentQueriesReached
}
qid := e.nextID
query := &QueryTask{
query: q.String(),
database: database,
startTime: time.Now(),
closing: make(chan struct{}),
monitorCh: make(chan error),
}
e.queries[qid] = query
go e.waitForQuery(qid, query.closing, interrupt, query.monitorCh)
e.nextID++
return qid, query, nil
}
// killQuery stops and removes a query from the QueryExecutor.
// This method can be used to forcefully terminate a running query.
func (e *QueryExecutor) killQuery(qid uint64) error {
e.mu.Lock()
defer e.mu.Unlock()
query, ok := e.queries[qid]
if !ok {
return fmt.Errorf("no such query id: %d", qid)
}
close(query.closing)
delete(e.queries, qid)
return nil
}
func (e *QueryExecutor) waitForQuery(qid uint64, interrupt <-chan struct{}, closing <-chan struct{}, monitorCh <-chan error) {
var timer <-chan time.Time
if e.QueryTimeout != 0 {
t := time.NewTimer(e.QueryTimeout)
timer = t.C
defer t.Stop()
}
select {
case <-closing:
query, ok := e.query(qid)
if !ok {
break
}
query.setError(ErrQueryInterrupted)
case err := <-monitorCh:
if err == nil {
break
}
query, ok := e.query(qid)
if !ok {
break
}
query.setError(err)
case <-timer:
query, ok := e.query(qid)
if !ok {
break
}
query.setError(ErrQueryTimeoutReached)
case <-interrupt:
// Query was manually closed so exit the select.
return
}
e.killQuery(qid)
}
// QueryMonitorFunc is a function that will be called to check if a query
// is currently healthy. If the query needs to be interrupted for some reason,
// the error should be returned by this function.
type QueryMonitorFunc func(<-chan struct{}) error
// QueryTask is the internal data structure for managing queries.
// For the public use data structure that gets returned, see QueryTask.
type QueryTask struct {
query string
database string
startTime time.Time
closing chan struct{}
monitorCh chan error
err error
mu sync.Mutex
}
// Monitor starts a new goroutine that will monitor a query. The function
// will be passed in a channel to signal when the query has been finished
// normally. If the function returns with an error and the query is still
// running, the query will be terminated.
func (q *QueryTask) Monitor(fn QueryMonitorFunc) {
go q.monitor(fn)
}
// Error returns any asynchronous error that may have occured while executing
// the query.
func (q *QueryTask) Error() error {
q.mu.Lock()
defer q.mu.Unlock()
return q.err
}
func (q *QueryTask) setError(err error) {
q.mu.Lock()
q.err = err
q.mu.Unlock()
}
func (q *QueryTask) monitor(fn QueryMonitorFunc) {
if err := fn(q.closing); err != nil {
select {
case <-q.closing:
case q.monitorCh <- err:
}
}
}

View File

@ -0,0 +1,257 @@
package influxql_test
import (
"errors"
"fmt"
"testing"
"time"
"github.com/influxdata/influxdb/influxql"
)
var errUnexpected = errors.New("unexpected error")
type StatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error
}
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx)
}
func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, database string) error {
return nil
}
func TestQueryExecutor_AttachQuery(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
if ctx.QueryID != 1 {
t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID)
}
return nil
},
}
discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
}
func TestQueryExecutor_KillQuery(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qid := make(chan uint64)
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
qid <- ctx.QueryID
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds")
return errUnexpected
}
},
}
results := e.ExecuteQuery(q, "mydb", 100, nil)
q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid))
if err != nil {
t.Fatal(err)
}
discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
result := <-results
if result.Err != influxql.ErrQueryInterrupted {
t.Errorf("unexpected error: %s", result.Err)
}
}
func TestQueryExecutor_Interrupt(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds")
return errUnexpected
}
},
}
closing := make(chan struct{})
results := e.ExecuteQuery(q, "mydb", 100, closing)
close(closing)
result := <-results
if result.Err != influxql.ErrQueryInterrupted {
t.Errorf("unexpected error: %s", result.Err)
}
}
func TestQueryExecutor_ShowQueries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
t.Errorf("unexpected statement: %s", stmt)
return errUnexpected
},
}
q, err = influxql.ParseQuery(`SHOW QUERIES`)
if err != nil {
t.Fatal(err)
}
results := e.ExecuteQuery(q, "", 100, nil)
result := <-results
if len(result.Series) != 1 {
t.Errorf("expected %d rows, got %d", 1, len(result.Series))
}
if result.Err != nil {
t.Errorf("unexpected error: %s", result.Err)
}
}
func TestQueryExecutor_Limit_Timeout(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case <-time.After(time.Second):
t.Errorf("timeout has not killed the query")
return errUnexpected
}
},
}
e.QueryTimeout = time.Nanosecond
results := e.ExecuteQuery(q, "mydb", 100, nil)
result := <-results
if result.Err != influxql.ErrQueryTimeoutReached {
t.Errorf("unexpected error: %s", result.Err)
}
}
func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qid := make(chan uint64)
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
qid <- ctx.QueryID
<-ctx.InterruptCh
return influxql.ErrQueryInterrupted
},
}
e.MaxConcurrentQueries = 1
defer e.Close()
// Start first query and wait for it to be executing.
go discardOutput(e.ExecuteQuery(q, "mydb", 100, nil))
<-qid
// Start second query and expect for it to fail.
results := e.ExecuteQuery(q, "mydb", 100, nil)
select {
case result := <-results:
if len(result.Series) != 0 {
t.Errorf("expected %d rows, got %d", 0, len(result.Series))
}
if result.Err != influxql.ErrMaxConcurrentQueriesReached {
t.Errorf("unexpected error: %s", result.Err)
}
case <-qid:
t.Errorf("unexpected statement execution for the second query")
}
}
func TestQueryExecutor_Close(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
ch1 := make(chan struct{})
ch2 := make(chan struct{})
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
close(ch1)
<-ctx.InterruptCh
close(ch2)
return influxql.ErrQueryInterrupted
},
}
results := e.ExecuteQuery(q, "mydb", 100, nil)
go func(results <-chan *influxql.Result) {
result := <-results
if result.Err != influxql.ErrQueryEngineShutdown {
t.Errorf("unexpected error: %s", result.Err)
}
}(results)
// Wait for the statement to start executing.
<-ch1
// Close the query executor.
e.Close()
// Check that the statement gets interrupted and finishes.
select {
case <-ch2:
case <-time.After(100 * time.Millisecond):
t.Error("closing the query manager did not kill the query after 100 milliseconds")
}
results = e.ExecuteQuery(q, "mydb", 100, nil)
result := <-results
if len(result.Series) != 0 {
t.Errorf("expected %d rows, got %d", 0, len(result.Series))
}
if result.Err != influxql.ErrQueryEngineShutdown {
t.Errorf("unexpected error: %s", result.Err)
}
}
func discardOutput(results <-chan *influxql.Result) {
for range results {
// Read all results and discard.
}
}

View File

@ -1,310 +0,0 @@
package influxql
import (
"errors"
"fmt"
"sync"
"time"
"github.com/influxdata/influxdb/models"
)
var (
// ErrNoQueryManager is an error sent when a SHOW QUERIES or KILL QUERY
// statement is issued with no query manager.
ErrNoQueryManager = errors.New("no query manager available")
// ErrQueryInterrupted is an error returned when the query is interrupted.
ErrQueryInterrupted = errors.New("query interrupted")
// ErrMaxConcurrentQueriesReached is an error when a query cannot be run
// because the maximum number of queries has been reached.
ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached")
// ErrQueryManagerShutdown is an error sent when the query cannot be
// attached because it was previous shutdown.
ErrQueryManagerShutdown = errors.New("query manager shutdown")
// ErrMaxPointsReached is an error when a query hits the maximum number of
// points.
ErrMaxPointsReached = errors.New("max number of points reached")
// ErrQueryTimeoutReached is an error when a query hits the timeout.
ErrQueryTimeoutReached = errors.New("query timeout reached")
)
// QueryTaskInfo holds information about a currently running query.
type QueryTaskInfo struct {
ID uint64
Query string
Database string
Duration time.Duration
}
// QueryParams holds the parameters used for creating a new query.
type QueryParams struct {
// The query to be tracked. Required.
Query *Query
// The database this query is being run in. Required.
Database string
// The timeout for automatically killing a query that hasn't finished. Optional.
Timeout time.Duration
// The channel to watch for when this query is interrupted or finished.
// Not required, but highly recommended. If this channel is not set, the
// query needs to be manually managed by the caller.
InterruptCh <-chan struct{}
// Holds any error thrown by the QueryManager if there is a problem while
// executing the query. Optional.
Error *QueryError
}
// QueryError is an error thrown by the QueryManager when there is a problem
// while executing the query.
type QueryError struct {
err error
mu sync.Mutex
}
// Error returns any reason why the QueryManager may have
// terminated the query. If a query completed successfully,
// this value will be nil.
func (q *QueryError) Error() error {
q.mu.Lock()
defer q.mu.Unlock()
return q.err
}
// QueryMonitorFunc is a function that will be called to check if a query
// is currently healthy. If the query needs to be interrupted for some reason,
// the error should be returned by this function.
type QueryMonitorFunc func(<-chan struct{}) error
type QueryManager interface {
// AttachQuery attaches a running query to be managed by the query manager.
// Returns the query id of the newly attached query or an error if it was
// unable to assign a query id or attach the query to the query manager.
// This function also returns a channel that will be closed when this
// query finishes running.
//
// After a query finishes running, the system is free to reuse a query id.
AttachQuery(params *QueryParams) (qid uint64, closing <-chan struct{}, err error)
// KillQuery stops and removes a query from the query manager.
// This method can be used to forcefully terminate a running query.
KillQuery(qid uint64) error
// MonitorQuery starts a new goroutine that will monitor a query.
// The function will be passed in a channel to signal when the query has been
// finished normally. If the function returns with an error and the query is
// still running, the query will be terminated.
//
// Query managers that do not implement this functionality should return an error.
MonitorQuery(qid uint64, fn QueryMonitorFunc) error
// Close kills all running queries and prevents new queries from being attached.
Close() error
// Queries lists the currently running tasks.
Queries() []QueryTaskInfo
}
func DefaultQueryManager(maxQueries int) QueryManager {
return &defaultQueryManager{
queries: make(map[uint64]*queryTask),
nextID: 1,
maxQueries: maxQueries,
}
}
func ExecuteShowQueriesStatement(qm QueryManager, q *ShowQueriesStatement) (models.Rows, error) {
if qm == nil {
return nil, ErrNoQueryManager
}
queries := qm.Queries()
values := make([][]interface{}, len(queries))
for i, qi := range queries {
var d string
if qi.Duration == 0 {
d = "0s"
} else if qi.Duration < time.Second {
d = fmt.Sprintf("%du", qi.Duration)
} else {
d = (qi.Duration - (qi.Duration % time.Second)).String()
}
values[i] = []interface{}{
qi.ID,
qi.Query,
qi.Database,
d,
}
}
return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration"},
Values: values,
}}, nil
}
// queryTask is the internal data structure for managing queries.
// For the public use data structure that gets returned, see QueryTask.
type queryTask struct {
query string
database string
startTime time.Time
closing chan struct{}
monitorCh chan error
err *QueryError
once sync.Once
}
func (q *queryTask) setError(err error) {
if q.err != nil {
q.err.mu.Lock()
defer q.err.mu.Unlock()
q.err.err = err
}
}
func (q *queryTask) monitor(fn QueryMonitorFunc) {
if err := fn(q.closing); err != nil {
select {
case <-q.closing:
case q.monitorCh <- err:
}
}
}
type defaultQueryManager struct {
queries map[uint64]*queryTask
nextID uint64
maxQueries int
mu sync.Mutex
shutdown bool
}
func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan struct{}, error) {
qm.mu.Lock()
defer qm.mu.Unlock()
if qm.shutdown {
return 0, nil, ErrQueryManagerShutdown
}
if qm.maxQueries > 0 && len(qm.queries) >= qm.maxQueries {
return 0, nil, ErrMaxConcurrentQueriesReached
}
qid := qm.nextID
query := &queryTask{
query: params.Query.String(),
database: params.Database,
startTime: time.Now(),
closing: make(chan struct{}),
monitorCh: make(chan error),
err: params.Error,
}
qm.queries[qid] = query
go qm.waitForQuery(qid, params.Timeout, params.InterruptCh, query.monitorCh)
qm.nextID++
return qid, query.closing, nil
}
func (qm *defaultQueryManager) Query(qid uint64) (*queryTask, bool) {
qm.mu.Lock()
query, ok := qm.queries[qid]
qm.mu.Unlock()
return query, ok
}
func (qm *defaultQueryManager) waitForQuery(qid uint64, timeout time.Duration, closing <-chan struct{}, monitorCh <-chan error) {
var timer <-chan time.Time
if timeout != 0 {
timer = time.After(timeout)
}
select {
case <-closing:
query, ok := qm.Query(qid)
if !ok {
break
}
query.setError(ErrQueryInterrupted)
case err := <-monitorCh:
if err == nil {
break
}
query, ok := qm.Query(qid)
if !ok {
break
}
query.setError(err)
case <-timer:
query, ok := qm.Query(qid)
if !ok {
break
}
query.setError(ErrQueryTimeoutReached)
}
qm.KillQuery(qid)
}
func (qm *defaultQueryManager) MonitorQuery(qid uint64, fn QueryMonitorFunc) error {
query, ok := qm.Query(qid)
if !ok {
return fmt.Errorf("no such query id: %d", qid)
}
go query.monitor(fn)
return nil
}
func (qm *defaultQueryManager) KillQuery(qid uint64) error {
qm.mu.Lock()
defer qm.mu.Unlock()
query, ok := qm.queries[qid]
if !ok {
return fmt.Errorf("no such query id: %d", qid)
}
close(query.closing)
delete(qm.queries, qid)
return nil
}
func (qm *defaultQueryManager) Close() error {
qm.mu.Lock()
defer qm.mu.Unlock()
qm.shutdown = true
for _, query := range qm.queries {
query.setError(ErrQueryManagerShutdown)
close(query.closing)
}
qm.queries = nil
return nil
}
func (qm *defaultQueryManager) Queries() []QueryTaskInfo {
qm.mu.Lock()
defer qm.mu.Unlock()
now := time.Now()
queries := make([]QueryTaskInfo, 0, len(qm.queries))
for qid, task := range qm.queries {
queries = append(queries, QueryTaskInfo{
ID: qid,
Query: task.query,
Database: task.database,
Duration: now.Sub(task.startTime),
})
}
return queries
}

View File

@ -1,207 +0,0 @@
package influxql_test
import (
"fmt"
"testing"
"time"
"github.com/influxdata/influxdb/influxql"
)
func TestQueryManager_AttachQuery(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
qid, _, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
defer qm.KillQuery(qid)
if qid != 1 {
t.Errorf("incorrect query id: exp=1 got=%d", qid)
}
}
func TestQueryManager_KillQuery(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
qid, ch, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
qm.KillQuery(qid)
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds")
}
if err := qm.KillQuery(qid); err == nil || err.Error() != fmt.Sprintf("no such query id: %d", qid) {
t.Errorf("incorrect error killing query, got %s", err)
}
}
func TestQueryManager_Interrupt(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
closing := make(chan struct{})
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
InterruptCh: closing,
}
_, ch, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
close(closing)
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Error("interrupting the query did not close the channel after 100 milliseconds")
}
}
func TestQueryManager_Queries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
qid, _, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
queries := qm.Queries()
if len(queries) != 1 {
t.Errorf("expected 1 query, got %d", len(queries))
} else {
qi := queries[0]
if qi.ID != qid {
t.Errorf("query id: exp=%d got=%d", qid, qi.ID)
}
if qi.Query != `SELECT count(value) FROM cpu` {
t.Errorf("query id: incorrect query string, got '%s'", qi.Query)
}
if qi.Database != "mydb" {
t.Errorf("query id: incorrect database, got %s", qi.Database)
}
}
qm.KillQuery(qid)
queries = qm.Queries()
if len(queries) != 0 {
t.Errorf("expected 0 queries, got %d", len(queries))
}
}
func TestQueryManager_Limit_Timeout(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
Timeout: time.Nanosecond,
}
_, ch, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Errorf("timeout has not killed the query")
}
}
func TestQueryManager_Limit_ConcurrentQueries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager(1)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
qid, _, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
defer qm.KillQuery(qid)
_, _, err = qm.AttachQuery(&params)
if err == nil || err != influxql.ErrMaxConcurrentQueriesReached {
t.Errorf("unexpected error: %s", err)
}
}
func TestQueryManager_Close(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}
_, ch, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
qm.Close()
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Error("closing the query manager did not kill the query after 100 milliseconds")
}
_, _, err = qm.AttachQuery(&params)
if err == nil || err != influxql.ErrQueryManagerShutdown {
t.Errorf("unexpected error: %s", err)
}
}

View File

@ -67,7 +67,7 @@ func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool {
// Service manages continuous query execution.
type Service struct {
MetaClient metaClient
QueryExecutor influxql.QueryExecutor
QueryExecutor *influxql.QueryExecutor
Config *Config
RunInterval time.Duration
// RunCh can be used by clients to signal service to run CQs.

View File

@ -48,16 +48,16 @@ func TestContinuousQueryService_Run(t *testing.T) {
expectCallCnt := 3
callCnt := 0
// Set a callback for ExecuteQuery.
qe := s.QueryExecutor.(*QueryExecutor)
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
}
dummych := make(chan *influxql.Result, 1)
dummych <- &influxql.Result{}
return dummych
// Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
}
ctx.Results <- &influxql.Result{}
return nil
},
}
// Use a custom "now" time since the internals of last run care about
@ -121,16 +121,16 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) {
expectCallCnt := 0
callCnt := 0
// Set a callback for ExecuteQuery.
qe := s.QueryExecutor.(*QueryExecutor)
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
}
dummych := make(chan *influxql.Result, 1)
dummych <- &influxql.Result{}
return dummych
// Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
}
ctx.Results <- &influxql.Result{}
return nil
},
}
s.Open()
@ -186,15 +186,15 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) {
callCnt := 0
// Set a callback for ExecuteQuery.
qe := s.QueryExecutor.(*QueryExecutor)
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
}
dummych := make(chan *influxql.Result, 1)
dummych <- &influxql.Result{}
return dummych
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
}
ctx.Results <- &influxql.Result{}
return nil
},
}
s.Open()
@ -240,13 +240,13 @@ func TestContinuousQueryService_NotLeader(t *testing.T) {
s.MetaClient.(*MetaClient).Leader = false
done := make(chan struct{})
qe := s.QueryExecutor.(*QueryExecutor)
// Set a callback for ExecuteQuery. Shouldn't get called because we're not the leader.
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
done <- struct{}{}
dummych := make(chan *influxql.Result, 1)
dummych <- &influxql.Result{Err: errUnexpected}
return dummych
// Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
done <- struct{}{}
ctx.Results <- &influxql.Result{Err: errUnexpected}
return nil
},
}
s.Open()
@ -267,13 +267,13 @@ func TestContinuousQueryService_MetaClientFailsToGetDatabases(t *testing.T) {
s.MetaClient.(*MetaClient).Err = errExpected
done := make(chan struct{})
qe := s.QueryExecutor.(*QueryExecutor)
// Set ExecuteQuery callback, which shouldn't get called because of meta store failure.
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
done <- struct{}{}
dummych := make(chan *influxql.Result, 1)
dummych <- &influxql.Result{Err: errUnexpected}
return dummych
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
done <- struct{}{}
ctx.Results <- &influxql.Result{Err: errUnexpected}
return nil
},
}
s.Open()
@ -289,6 +289,11 @@ func TestContinuousQueryService_MetaClientFailsToGetDatabases(t *testing.T) {
// Test ExecuteContinuousQuery with invalid queries.
func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
s := NewTestService(t)
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
return errUnexpected
},
}
dbis, _ := s.MetaClient.Databases()
dbi := dbis[0]
cqi := dbi.ContinuousQueries[0]
@ -317,8 +322,11 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
// Test ExecuteContinuousQuery when QueryExecutor returns an error.
func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
s := NewTestService(t)
qe := s.QueryExecutor.(*QueryExecutor)
qe.Err = errExpected
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
return errExpected
},
}
dbis, _ := s.MetaClient.Databases()
dbi := dbis[0]
@ -336,7 +344,7 @@ func NewTestService(t *testing.T) *Service {
s := NewService(NewConfig())
ms := NewMetaClient(t)
s.MetaClient = ms
s.QueryExecutor = NewQueryExecutor(t)
s.QueryExecutor = influxql.NewQueryExecutor()
s.RunInterval = time.Millisecond
// Set Logger to write to dev/null so stdout isn't polluted.
@ -473,62 +481,34 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error
// QueryExecutor is a mock query executor.
type QueryExecutor struct {
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result
Results []*influxql.Result
ResultInterval time.Duration
Err error
ErrAfterResult int
t *testing.T
*influxql.QueryExecutor
Err error
t *testing.T
}
// StatementExecutor is a mock statement executor.
type StatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error
}
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx)
}
func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, database string) error {
return nil
}
// NewQueryExecutor returns a *QueryExecutor.
func NewQueryExecutor(t *testing.T) *QueryExecutor {
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{}
return &QueryExecutor{
ErrAfterResult: -1,
t: t,
QueryExecutor: e,
t: t,
}
}
// ExecuteQuery returns a channel that the caller can read query results from.
func (qe *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
// If the test set a callback, call it.
if qe.ExecuteQueryFn != nil {
return qe.ExecuteQueryFn(query, database, chunkSize, make(chan struct{}))
}
ch := make(chan *influxql.Result, 1)
// Are we supposed to error immediately?
if qe.ErrAfterResult == -1 && qe.Err != nil {
ch <- &influxql.Result{Err: qe.Err}
close(ch)
return ch
}
// Start a go routine to send results and / or error.
go func() {
n := 0
for i, r := range qe.Results {
if i == qe.ErrAfterResult-1 {
qe.t.Logf("ExecuteQuery(): ErrAfterResult %d", qe.ErrAfterResult-1)
ch <- &influxql.Result{Err: qe.Err}
close(ch)
return
}
ch <- r
n++
time.Sleep(qe.ResultInterval)
}
qe.t.Logf("ExecuteQuery(): all (%d) results sent", n)
if n == 0 {
ch <- &influxql.Result{Err: qe.Err}
}
close(ch)
}()
return ch
}
// PointsWriter is a mock points writer.
type PointsWriter struct {
WritePointsFn func(p *cluster.WritePointsRequest) error

View File

@ -63,7 +63,7 @@ type Handler struct {
AuthorizeQuery(u *meta.UserInfo, query *influxql.Query, database string) error
}
QueryExecutor influxql.QueryExecutor
QueryExecutor *influxql.QueryExecutor
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error

View File

@ -20,17 +20,15 @@ import (
// Ensure the handler returns results from a query (including nil results).
func TestHandler_Query(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
if q.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", q.String())
} else if db != `foo` {
t.Fatalf("unexpected db: %s", db)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
if stmt.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` {
t.Fatalf("unexpected db: %s", ctx.Database)
}
return NewResultChan(
&influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})},
&influxql.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})},
nil,
)
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
ctx.Results <- &influxql.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil
}
w := httptest.NewRecorder()
@ -45,13 +43,14 @@ func TestHandler_Query(t *testing.T) {
// Ensure the handler returns results from a query (including nil results).
func TestHandler_QueryRegex(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
if q.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` {
t.Fatalf("unexpected query: %s", q.String())
} else if db != `test` {
t.Fatalf("unexpected db: %s", db)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` {
t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `test` {
t.Fatalf("unexpected db: %s", ctx.Database)
}
return NewResultChan(nil)
ctx.Results <- nil
return nil
}
w := httptest.NewRecorder()
@ -61,11 +60,10 @@ func TestHandler_QueryRegex(t *testing.T) {
// Ensure the handler merges results from the same statement.
func TestHandler_Query_MergeResults(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
return NewResultChan(
&influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})},
&influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})},
)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil
}
w := httptest.NewRecorder()
@ -80,11 +78,10 @@ func TestHandler_Query_MergeResults(t *testing.T) {
// Ensure the handler merges results from the same statement.
func TestHandler_Query_MergeEmptyResults(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
return NewResultChan(
&influxql.Result{StatementID: 1, Series: models.Rows{}},
&influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})},
)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows{}}
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil
}
w := httptest.NewRecorder()
@ -99,14 +96,13 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) {
// Ensure the handler can parse chunked and chunk size query parameters.
func TestHandler_Query_Chunked(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
if chunkSize != 2 {
t.Fatalf("unexpected chunk size: %d", chunkSize)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
if ctx.ChunkSize != 2 {
t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize)
}
return NewResultChan(
&influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})},
&influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})},
)
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil
}
w := httptest.NewRecorder()
@ -161,8 +157,8 @@ func TestHandler_Query_ErrInvalidQuery(t *testing.T) {
// Ensure the handler returns a status 200 if an error is returned in the result.
func TestHandler_Query_ErrResult(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
return NewResultChan(&influxql.Result{Err: errors.New("measurement not found")})
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
return errors.New("measurement not found")
}
w := httptest.NewRecorder()
@ -192,6 +188,9 @@ func TestHandler_Ping(t *testing.T) {
// Ensure the handler returns the version correctly from the different endpoints.
func TestHandler_Version(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
return nil
}
w := httptest.NewRecorder()
tests := []struct {
method string
@ -281,8 +280,8 @@ func (*invalidJSON) MarshalJSON() ([]byte, error) { return nil, errors.New("mark
// NewHandler represents a test wrapper for httpd.Handler.
type Handler struct {
*httpd.Handler
MetaClient HandlerMetaStore
QueryExecutor HandlerQueryExecutor
MetaClient HandlerMetaStore
StatementExecutor HandlerStatementExecutor
}
// NewHandler returns a new instance of Handler.
@ -292,7 +291,8 @@ func NewHandler(requireAuthentication bool) *Handler {
Handler: httpd.NewHandler(requireAuthentication, true, false, 0, statMap),
}
h.Handler.MetaClient = &h.MetaClient
h.Handler.QueryExecutor = &h.QueryExecutor
h.Handler.QueryExecutor = influxql.NewQueryExecutor()
h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor
h.Handler.Version = "0.0.0"
return h
}
@ -325,18 +325,17 @@ func (s *HandlerMetaStore) Users() []meta.UserInfo {
return s.UsersFn()
}
// HandlerQueryExecutor is a mock implementation of Handler.QueryExecutor.
type HandlerQueryExecutor struct {
AuthorizeFn func(u *meta.UserInfo, q *influxql.Query, db string) error
ExecuteQueryFn func(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result
// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
type HandlerStatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error
}
func (e *HandlerQueryExecutor) Authorize(u *meta.UserInfo, q *influxql.Query, db string) error {
return e.AuthorizeFn(u, q, db)
func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx)
}
func (e *HandlerQueryExecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int, closing chan struct{}) <-chan *influxql.Result {
return e.ExecuteQueryFn(q, db, chunkSize, closing)
func (e *HandlerStatementExecutor) NormalizeStatement(stmt influxql.Statement, database string) error {
return nil
}
// MustNewRequest returns a new HTTP request. Panic on error.

View File

@ -613,6 +613,27 @@ func (s *Store) IteratorCreators() influxql.IteratorCreators {
return a
}
func (s *Store) IteratorCreator(shards []uint64) (influxql.IteratorCreator, error) {
// Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0)
if err := func() error {
for _, id := range shards {
ic := s.ShardIteratorCreator(id)
if ic == nil {
continue
}
ics = append(ics, ic)
}
return nil
}(); err != nil {
influxql.IteratorCreators(ics).Close()
return nil, err
}
return influxql.IteratorCreators(ics), nil
}
// WriteToShard writes a list of points to a shard identified by its ID.
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
s.mu.RLock()
@ -632,55 +653,6 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
return sh.WritePoints(points)
}
func (s *Store) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) {
// NOTE(benbjohnson):
// This function is temporarily moved here until reimplemented in the new query engine.
// Find the database.
db := s.DatabaseIndex(database)
if db == nil {
return nil, nil
}
// Expand regex expressions in the FROM clause.
sources, err := s.ExpandSources(stmt.Sources)
if err != nil {
return nil, err
}
measurements, err := measurementsFromSourcesOrDB(db, sources...)
if err != nil {
return nil, err
}
// Make result.
rows := make(models.Rows, 0, len(measurements))
// Loop through measurements, adding a result row for each.
for _, m := range measurements {
// Create a new row.
r := &models.Row{
Name: m.Name,
Columns: []string{"fieldKey"},
}
// Get a list of field names from the measurement then sort them.
names := m.FieldNames()
sort.Strings(names)
// Add the field names to the result row values.
for _, n := range names {
v := interface{}(n)
r.Values = append(r.Values, []interface{}{v})
}
// Append the row to the result.
rows = append(rows, r)
}
return rows, nil
}
// filterShowSeriesResult will limit the number of series returned based on the limit and the offset.
// Unlike limit and offset on SELECT statements, the limit and offset don't apply to the number of Rows, but
// to the number of total Values returned, since each Value represents a unique series.
@ -710,87 +682,6 @@ func (e *Store) filterShowSeriesResult(limit, offset int, rows models.Rows) mode
return filteredSeries
}
func (s *Store) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) {
// NOTE(benbjohnson):
// This function is temporarily moved here until reimplemented in the new query engine.
// Check for time in WHERE clause (not supported).
if influxql.HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW TAG VALUES doesn't support time in WHERE clause")
}
// Find the database.
db := s.DatabaseIndex(database)
if db == nil {
return nil, nil
}
// Expand regex expressions in the FROM clause.
sources, err := s.ExpandSources(stmt.Sources)
if err != nil {
return nil, err
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourcesOrDB(db, sources...)
if err != nil {
return nil, err
}
// Make result.
var rows models.Rows
tagValues := make(map[string]stringSet)
for _, m := range measurements {
var ids SeriesIDs
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
ids, _, err = m.walkWhereForSeriesIds(stmt.Condition)
if err != nil {
return nil, err
}
// If no series matched, then go to the next measurement.
if len(ids) == 0 {
continue
}
// TODO: check return of walkWhereForSeriesIds for fields
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
for k, v := range m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids) {
_, ok := tagValues[k]
if !ok {
tagValues[k] = v
}
tagValues[k] = tagValues[k].union(v)
}
}
for k, v := range tagValues {
r := &models.Row{
Name: k + "TagValues",
Columns: []string{k},
}
vals := v.list()
sort.Strings(vals)
for _, val := range vals {
v := interface{}(val)
r.Values = append(r.Values, []interface{}{v})
}
rows = append(rows, r)
}
sort.Sort(rows)
return rows, nil
}
// IsRetryable returns true if this error is temporary and could be retried
func IsRetryable(err error) bool {
if err == nil {