2015-05-29 14:31:03 +00:00
|
|
|
package tsdb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"os"
|
|
|
|
"runtime"
|
|
|
|
"sort"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/influxdb/influxdb/influxql"
|
|
|
|
"github.com/influxdb/influxdb/meta"
|
|
|
|
)
|
|
|
|
|
|
|
|
type QueryExecutor struct {
|
|
|
|
// The meta store for accessing and updating cluster and schema data.
|
|
|
|
MetaStore interface {
|
|
|
|
Database(name string) (*meta.DatabaseInfo, error)
|
|
|
|
Databases() ([]meta.DatabaseInfo, error)
|
|
|
|
User(name string) (*meta.UserInfo, error)
|
|
|
|
AdminUserExists() (bool, error)
|
|
|
|
Authenticate(username, password string) (*meta.UserInfo, error)
|
|
|
|
RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Executes statements relating to meta data.
|
|
|
|
MetaStatementExecutor interface {
|
|
|
|
ExecuteStatement(stmt influxql.Statement) *influxql.Result
|
|
|
|
}
|
|
|
|
|
|
|
|
// The stats service to report to
|
|
|
|
Stats interface {
|
|
|
|
Add(key string, delta int64)
|
|
|
|
Inc(key string)
|
|
|
|
Name() string
|
|
|
|
Walk(f func(string, int64))
|
|
|
|
}
|
|
|
|
|
|
|
|
Logger *log.Logger
|
|
|
|
|
|
|
|
// the local daata store
|
|
|
|
store *Store
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewQueryExecutor(store *Store) *QueryExecutor {
|
|
|
|
return &QueryExecutor{
|
|
|
|
store: store,
|
|
|
|
Logger: log.New(os.Stderr, "[query] ", log.LstdFlags),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Begin is for influxql/engine.go to use to get a transaction object to start the query
|
|
|
|
func (q *QueryExecutor) Begin() (influxql.Tx, error) {
|
|
|
|
return newTx(q.MetaStore, q.store), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Authorize user u to execute query q on database.
|
|
|
|
// database can be "" for queries that do not require a database.
|
|
|
|
// If u is nil, this means authorization is disabled.
|
|
|
|
func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *influxql.Query, database string) error {
|
|
|
|
const authErrLogFmt = "unauthorized request | user: %q | query: %q | database %q\n"
|
|
|
|
|
|
|
|
if u == nil {
|
|
|
|
q.Logger.Printf(authErrLogFmt, "", query.String(), database)
|
|
|
|
return ErrAuthorize{text: "no user provided"}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cluster admins can do anything.
|
|
|
|
if u.Admin {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check each statement in the query.
|
|
|
|
for _, stmt := range query.Statements {
|
|
|
|
// Get the privileges required to execute the statement.
|
|
|
|
privs := stmt.RequiredPrivileges()
|
|
|
|
|
|
|
|
// Make sure the user has each privilege required to execute
|
|
|
|
// the statement.
|
|
|
|
for _, p := range privs {
|
|
|
|
// Use the db name specified by the statement or the db
|
|
|
|
// name passed by the caller if one wasn't specified by
|
|
|
|
// the statement.
|
|
|
|
dbname := p.Name
|
|
|
|
if dbname == "" {
|
|
|
|
dbname = database
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check if user has required privilege.
|
|
|
|
if !u.Authorize(p.Privilege, dbname) {
|
|
|
|
var msg string
|
|
|
|
if dbname == "" {
|
|
|
|
msg = "requires cluster admin"
|
|
|
|
} else {
|
|
|
|
msg = fmt.Sprintf("requires %s privilege on %s", p.Privilege.String(), dbname)
|
|
|
|
}
|
|
|
|
q.Logger.Printf(authErrLogFmt, u.Name, query.String(), database)
|
|
|
|
return ErrAuthorize{
|
|
|
|
text: fmt.Sprintf("%s not authorized to execute '%s'. %s", u.Name, stmt.String(), msg),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ExecuteQuery executes an InfluxQL query against the server.
|
|
|
|
// It sends results down the passed in chan and closes it when done. It will close the chan
|
|
|
|
// on the first statement that throws an error.
|
|
|
|
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (chan *influxql.Result, error) {
|
|
|
|
q.Stats.Add("queriesRx", int64(len(query.Statements)))
|
|
|
|
|
|
|
|
// Execute each statement. Keep the iterator external so we can
|
|
|
|
// track how many of the statements were executed
|
|
|
|
results := make(chan *influxql.Result)
|
|
|
|
go func() {
|
|
|
|
var i int
|
|
|
|
var stmt influxql.Statement
|
|
|
|
for i, stmt = range query.Statements {
|
|
|
|
// If a default database wasn't passed in by the caller,
|
|
|
|
// try to get it from the statement.
|
|
|
|
defaultDB := database
|
|
|
|
if defaultDB == "" {
|
|
|
|
if s, ok := stmt.(influxql.HasDefaultDatabase); ok {
|
|
|
|
defaultDB = s.DefaultDatabase()
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we have a default database, normalize the statement with it.
|
|
|
|
if defaultDB != "" {
|
|
|
|
if err := q.normalizeStatement(stmt, defaultDB); err != nil {
|
|
|
|
results <- &influxql.Result{Err: err}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var res *influxql.Result
|
|
|
|
switch stmt := stmt.(type) {
|
|
|
|
case *influxql.SelectStatement:
|
|
|
|
if err := q.executeSelectStatement(i, stmt, database, results, chunkSize); err != nil {
|
|
|
|
results <- &influxql.Result{Err: err}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
case *influxql.DropSeriesStatement:
|
|
|
|
res = q.executeDropSeriesStatement(stmt, database)
|
|
|
|
case *influxql.ShowSeriesStatement:
|
|
|
|
res = q.executeShowSeriesStatement(stmt, database)
|
|
|
|
case *influxql.DropMeasurementStatement:
|
|
|
|
res = q.executeDropMeasurementStatement(stmt, database)
|
|
|
|
case *influxql.ShowMeasurementsStatement:
|
|
|
|
res = q.executeShowMeasurementsStatement(stmt, database)
|
|
|
|
case *influxql.ShowTagKeysStatement:
|
|
|
|
res = q.executeShowTagKeysStatement(stmt, database)
|
|
|
|
case *influxql.ShowTagValuesStatement:
|
|
|
|
res = q.executeShowTagValuesStatement(stmt, database)
|
|
|
|
case *influxql.ShowFieldKeysStatement:
|
|
|
|
res = q.executeShowFieldKeysStatement(stmt, database)
|
|
|
|
case *influxql.ShowStatsStatement:
|
|
|
|
res = q.executeShowStatsStatement(stmt)
|
|
|
|
case *influxql.ShowDiagnosticsStatement:
|
|
|
|
res = q.executeShowDiagnosticsStatement(stmt)
|
|
|
|
case *influxql.DeleteStatement:
|
|
|
|
res = &influxql.Result{Err: ErrInvalidQuery}
|
|
|
|
default:
|
|
|
|
// Delegate all meta statements to a separate executor.
|
|
|
|
res = q.MetaStatementExecutor.ExecuteStatement(stmt)
|
|
|
|
}
|
|
|
|
|
|
|
|
if res != nil {
|
|
|
|
// set the StatementID for the handler on the other side to combine results
|
|
|
|
res.StatementID = i
|
|
|
|
|
|
|
|
// If an error occurs then stop processing remaining statements.
|
|
|
|
results <- res
|
|
|
|
if res.Err != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
q.Stats.Inc("queriesExecuted")
|
|
|
|
}
|
|
|
|
|
|
|
|
// if there was an error send results that the remaining statements weren't executed
|
|
|
|
for ; i < len(query.Statements)-1; i++ {
|
|
|
|
results <- &influxql.Result{Err: ErrNotExecuted}
|
|
|
|
}
|
|
|
|
|
|
|
|
close(results)
|
|
|
|
}()
|
|
|
|
|
|
|
|
return results, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// executeSelectStatement plans and executes a select statement against a database.
|
|
|
|
func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, database string, results chan *influxql.Result, chunkSize int) error {
|
|
|
|
// Perform any necessary query re-writing.
|
|
|
|
stmt, err := q.rewriteSelectStatement(stmt)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Plan statement execution.
|
|
|
|
p := influxql.NewPlanner(q)
|
|
|
|
e, err := p.Plan(stmt, chunkSize)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Execute plan.
|
|
|
|
ch := e.Execute()
|
|
|
|
|
|
|
|
// Stream results from the channel. We should send an empty result if nothing comes through.
|
|
|
|
resultSent := false
|
|
|
|
for row := range ch {
|
|
|
|
if row.Err != nil {
|
|
|
|
return row.Err
|
|
|
|
} else {
|
|
|
|
resultSent = true
|
|
|
|
results <- &influxql.Result{StatementID: statementID, Series: []*influxql.Row{row}}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !resultSent {
|
|
|
|
results <- &influxql.Result{StatementID: statementID, Series: make([]*influxql.Row, 0)}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// rewriteSelectStatement performs any necessary query re-writing.
|
|
|
|
func (q *QueryExecutor) rewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
// Expand regex expressions in the FROM clause.
|
|
|
|
sources, err := q.expandSources(stmt.Sources)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
stmt.Sources = sources
|
|
|
|
|
|
|
|
// Expand wildcards in the fields or GROUP BY.
|
|
|
|
if stmt.HasWildcard() {
|
|
|
|
stmt, err = q.expandWildcards(stmt)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
stmt.RewriteDistinct()
|
|
|
|
|
|
|
|
return stmt, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// expandWildcards returns a new SelectStatement with wildcards in the fields
|
|
|
|
// and/or GROUP BY exapnded with actual field names.
|
|
|
|
func (q *QueryExecutor) expandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
|
|
|
|
// If there are no wildcards in the statement, return it as-is.
|
|
|
|
if !stmt.HasWildcard() {
|
|
|
|
return stmt, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Use sets to avoid duplicate field names.
|
|
|
|
fieldSet := map[string]struct{}{}
|
|
|
|
dimensionSet := map[string]struct{}{}
|
|
|
|
|
|
|
|
var fields influxql.Fields
|
|
|
|
var dimensions influxql.Dimensions
|
|
|
|
|
|
|
|
// Iterate measurements in the FROM clause getting the fields & dimensions for each.
|
|
|
|
for _, src := range stmt.Sources {
|
|
|
|
if m, ok := src.(*influxql.Measurement); ok {
|
|
|
|
// Lookup the database.
|
|
|
|
db := q.store.DatabaseIndex(m.Database)
|
|
|
|
if db == nil {
|
|
|
|
return nil, ErrDatabaseNotFound(m.Database)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Lookup the measurement in the database.
|
|
|
|
mm := db.measurements[m.Name]
|
|
|
|
if mm == nil {
|
|
|
|
return nil, ErrMeasurementNotFound(m.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the fields for this measurement.
|
|
|
|
for name, _ := range mm.FieldNames {
|
|
|
|
if _, ok := fieldSet[name]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
fieldSet[name] = struct{}{}
|
|
|
|
fields = append(fields, &influxql.Field{Expr: &influxql.VarRef{Val: name}})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the dimensions for this measurement.
|
|
|
|
for _, t := range mm.tagKeys() {
|
|
|
|
if _, ok := dimensionSet[t]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
dimensionSet[t] = struct{}{}
|
|
|
|
dimensions = append(dimensions, &influxql.Dimension{Expr: &influxql.VarRef{Val: t}})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return a new SelectStatement with the wild cards rewritten.
|
|
|
|
return stmt.RewriteWildcards(fields, dimensions), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// expandSources expands regex sources and removes duplicates.
|
|
|
|
// NOTE: sources must be normalized (db and rp set) before calling this function.
|
|
|
|
func (q *QueryExecutor) expandSources(sources influxql.Sources) (influxql.Sources, error) {
|
|
|
|
// Use a map as a set to prevent duplicates. Two regexes might produce
|
|
|
|
// duplicates when expanded.
|
|
|
|
set := map[string]influxql.Source{}
|
|
|
|
names := []string{}
|
|
|
|
|
|
|
|
// Iterate all sources, expanding regexes when they're found.
|
|
|
|
for _, source := range sources {
|
|
|
|
switch src := source.(type) {
|
|
|
|
case *influxql.Measurement:
|
|
|
|
if src.Regex == nil {
|
|
|
|
name := src.String()
|
|
|
|
set[name] = src
|
|
|
|
names = append(names, name)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Lookup the database.
|
|
|
|
db := q.store.DatabaseIndex(src.Database)
|
|
|
|
if db == nil {
|
|
|
|
return nil, ErrDatabaseNotFound(src.Database)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get measurements from the database that match the regex.
|
|
|
|
measurements := db.measurementsByRegex(src.Regex.Val)
|
|
|
|
|
|
|
|
// Add those measurments to the set.
|
|
|
|
for _, m := range measurements {
|
|
|
|
m2 := &influxql.Measurement{
|
|
|
|
Database: src.Database,
|
|
|
|
RetentionPolicy: src.RetentionPolicy,
|
|
|
|
Name: m.Name,
|
|
|
|
}
|
|
|
|
|
|
|
|
name := m2.String()
|
|
|
|
if _, ok := set[name]; !ok {
|
|
|
|
set[name] = m2
|
|
|
|
names = append(names, name)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("expandSources: unsuported source type: %T", source)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sort the list of source names.
|
|
|
|
sort.Strings(names)
|
|
|
|
|
|
|
|
// Convert set to a list of Sources.
|
|
|
|
expanded := make(influxql.Sources, 0, len(set))
|
|
|
|
for _, name := range names {
|
|
|
|
expanded = append(expanded, set[name])
|
|
|
|
}
|
|
|
|
|
|
|
|
return expanded, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) *influxql.Result {
|
|
|
|
panic("not yet implemented")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) *influxql.Result {
|
|
|
|
panic("not yet implemented")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) *influxql.Result {
|
|
|
|
// Find the database.
|
|
|
|
db := q.store.DatabaseIndex(database)
|
|
|
|
if db == nil {
|
|
|
|
return &influxql.Result{Err: ErrDatabaseNotFound(database)}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the list of measurements we're interested in.
|
|
|
|
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create result struct that will be populated and returned.
|
|
|
|
result := &influxql.Result{
|
|
|
|
Series: make(influxql.Rows, 0, len(measurements)),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Loop through measurements to build result. One result row / measurement.
|
|
|
|
for _, m := range measurements {
|
|
|
|
var ids seriesIDs
|
|
|
|
|
|
|
|
if stmt.Condition != nil {
|
|
|
|
// Get series IDs that match the WHERE clause.
|
|
|
|
filters := map[uint64]influxql.Expr{}
|
|
|
|
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If no series matched, then go to the next measurement.
|
|
|
|
if len(ids) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: check return of walkWhereForSeriesIds for fields
|
|
|
|
} else {
|
|
|
|
// No WHERE clause so get all series IDs for this measurement.
|
|
|
|
ids = m.seriesIDs
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make a new row for this measurement.
|
|
|
|
r := &influxql.Row{
|
|
|
|
Name: m.Name,
|
|
|
|
Columns: m.tagKeys(),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Loop through series IDs getting matching tag sets.
|
|
|
|
for _, id := range ids {
|
|
|
|
if s, ok := m.seriesByID[id]; ok {
|
|
|
|
values := make([]interface{}, 0, len(r.Columns)+1)
|
|
|
|
values = append(values, id)
|
|
|
|
for _, column := range r.Columns {
|
|
|
|
values = append(values, s.Tags[column])
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add the tag values to the row.
|
|
|
|
r.Values = append(r.Values, values)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// make the id the first column
|
|
|
|
r.Columns = append([]string{"_id"}, r.Columns...)
|
|
|
|
|
|
|
|
// Append the row to the result.
|
|
|
|
result.Series = append(result.Series, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
if stmt.Limit > 0 || stmt.Offset > 0 {
|
|
|
|
result.Series = q.filterShowSeriesResult(stmt.Limit, stmt.Offset, result.Series)
|
|
|
|
}
|
|
|
|
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
// filterShowSeriesResult will limit the number of series returned based on the limit and the offset.
|
|
|
|
// Unlike limit and offset on SELECT statements, the limit and offset don't apply to the number of Rows, but
|
|
|
|
// to the number of total Values returned, since each Value represents a unique series.
|
|
|
|
func (q *QueryExecutor) filterShowSeriesResult(limit, offset int, rows influxql.Rows) influxql.Rows {
|
|
|
|
var filteredSeries influxql.Rows
|
|
|
|
seriesCount := 0
|
|
|
|
for _, r := range rows {
|
|
|
|
var currentSeries [][]interface{}
|
|
|
|
|
|
|
|
// filter the values
|
|
|
|
for _, v := range r.Values {
|
|
|
|
if seriesCount >= offset && seriesCount-offset < limit {
|
|
|
|
currentSeries = append(currentSeries, v)
|
|
|
|
}
|
|
|
|
seriesCount++
|
|
|
|
}
|
|
|
|
|
|
|
|
// only add the row back in if there are some values in it
|
|
|
|
if len(currentSeries) > 0 {
|
|
|
|
r.Values = currentSeries
|
|
|
|
filteredSeries = append(filteredSeries, r)
|
|
|
|
if seriesCount > limit+offset {
|
|
|
|
return filteredSeries
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return filteredSeries
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string) *influxql.Result {
|
|
|
|
// Find the database.
|
|
|
|
db := q.store.DatabaseIndex(database)
|
|
|
|
if db == nil {
|
|
|
|
return &influxql.Result{Err: ErrDatabaseNotFound(database)}
|
|
|
|
}
|
|
|
|
|
|
|
|
var measurements Measurements
|
|
|
|
|
|
|
|
// If a WHERE clause was specified, filter the measurements.
|
|
|
|
if stmt.Condition != nil {
|
|
|
|
var err error
|
|
|
|
measurements, err = db.measurementsByExpr(stmt.Condition)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Otherwise, get all measurements from the database.
|
|
|
|
measurements = db.Measurements()
|
|
|
|
}
|
|
|
|
sort.Sort(measurements)
|
|
|
|
|
|
|
|
offset := stmt.Offset
|
|
|
|
limit := stmt.Limit
|
|
|
|
|
|
|
|
// If OFFSET is past the end of the array, return empty results.
|
|
|
|
if offset > len(measurements)-1 {
|
|
|
|
return &influxql.Result{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate last index based on LIMIT.
|
|
|
|
end := len(measurements)
|
|
|
|
if limit > 0 && offset+limit < end {
|
|
|
|
limit = offset + limit
|
|
|
|
} else {
|
|
|
|
limit = end
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make a result row to hold all measurement names.
|
|
|
|
row := &influxql.Row{
|
|
|
|
Name: "measurements",
|
|
|
|
Columns: []string{"name"},
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add one value to the row for each measurement name.
|
|
|
|
for i := offset; i < limit; i++ {
|
|
|
|
m := measurements[i]
|
|
|
|
v := interface{}(m.Name)
|
|
|
|
row.Values = append(row.Values, []interface{}{v})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make a result.
|
|
|
|
result := &influxql.Result{
|
2015-05-29 21:28:08 +00:00
|
|
|
Series: []influxql.Row{row},
|
2015-05-29 14:31:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement, database string) *influxql.Result {
|
|
|
|
// Find the database.
|
|
|
|
db := q.store.DatabaseIndex(database)
|
|
|
|
if db == nil {
|
|
|
|
return &influxql.Result{Err: ErrDatabaseNotFound(database)}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the list of measurements we're interested in.
|
|
|
|
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make result.
|
|
|
|
result := &influxql.Result{
|
|
|
|
Series: make(influxql.Rows, 0, len(measurements)),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add one row per measurement to the result.
|
|
|
|
for _, m := range measurements {
|
|
|
|
// TODO: filter tag keys by stmt.Condition
|
|
|
|
|
|
|
|
// Get the tag keys in sorted order.
|
|
|
|
keys := m.tagKeys()
|
|
|
|
|
|
|
|
// Convert keys to an [][]interface{}.
|
|
|
|
values := make([][]interface{}, 0, len(m.seriesByTagKeyValue))
|
|
|
|
for _, k := range keys {
|
|
|
|
v := interface{}(k)
|
|
|
|
values = append(values, []interface{}{v})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make a result row for the measurement.
|
|
|
|
r := &influxql.Row{
|
|
|
|
Name: m.Name,
|
|
|
|
Columns: []string{"tagKey"},
|
|
|
|
Values: values,
|
|
|
|
}
|
|
|
|
|
|
|
|
result.Series = append(result.Series, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: LIMIT & OFFSET
|
|
|
|
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result {
|
|
|
|
// Find the database.
|
|
|
|
db := q.store.DatabaseIndex(database)
|
|
|
|
if db == nil {
|
|
|
|
return &influxql.Result{Err: ErrDatabaseNotFound(database)}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the list of measurements we're interested in.
|
|
|
|
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make result.
|
|
|
|
result := &influxql.Result{
|
|
|
|
Series: make(influxql.Rows, 0),
|
|
|
|
}
|
|
|
|
|
|
|
|
tagValues := make(map[string]stringSet)
|
|
|
|
for _, m := range measurements {
|
|
|
|
var ids seriesIDs
|
|
|
|
|
|
|
|
if stmt.Condition != nil {
|
|
|
|
// Get series IDs that match the WHERE clause.
|
|
|
|
filters := map[uint64]influxql.Expr{}
|
|
|
|
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If no series matched, then go to the next measurement.
|
|
|
|
if len(ids) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: check return of walkWhereForSeriesIds for fields
|
|
|
|
} else {
|
|
|
|
// No WHERE clause so get all series IDs for this measurement.
|
|
|
|
ids = m.seriesIDs
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids) {
|
|
|
|
_, ok := tagValues[k]
|
|
|
|
if !ok {
|
|
|
|
tagValues[k] = v
|
|
|
|
}
|
|
|
|
tagValues[k] = tagValues[k].union(v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range tagValues {
|
|
|
|
r := &influxql.Row{
|
|
|
|
Name: k + "TagValues",
|
|
|
|
Columns: []string{k},
|
|
|
|
}
|
|
|
|
|
|
|
|
vals := v.list()
|
|
|
|
sort.Strings(vals)
|
|
|
|
|
|
|
|
for _, val := range vals {
|
|
|
|
v := interface{}(val)
|
|
|
|
r.Values = append(r.Values, []interface{}{v})
|
|
|
|
}
|
|
|
|
|
|
|
|
result.Series = append(result.Series, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Sort(result.Series)
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) *influxql.Result {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
// Find the database.
|
|
|
|
db := q.store.DatabaseIndex(database)
|
|
|
|
if db == nil {
|
|
|
|
return &influxql.Result{Err: ErrDatabaseNotFound(database)}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the list of measurements we're interested in.
|
|
|
|
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make result.
|
|
|
|
result := &influxql.Result{
|
|
|
|
Series: make(influxql.Rows, 0, len(measurements)),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Loop through measurements, adding a result row for each.
|
|
|
|
for _, m := range measurements {
|
|
|
|
// Create a new row.
|
|
|
|
r := &influxql.Row{
|
|
|
|
Name: m.Name,
|
|
|
|
Columns: []string{"fieldKey"},
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get a list of field names from the measurement then sort them.
|
|
|
|
names := make([]string, 0, len(m.FieldNames))
|
|
|
|
for n, _ := range m.FieldNames {
|
|
|
|
names = append(names, n)
|
|
|
|
}
|
|
|
|
sort.Strings(names)
|
|
|
|
|
|
|
|
// Add the field names to the result row values.
|
|
|
|
for _, n := range names {
|
|
|
|
v := interface{}(n)
|
|
|
|
r.Values = append(r.Values, []interface{}{v})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Append the row to the result.
|
|
|
|
result.Series = append(result.Series, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
// measurementsFromSourceOrDB returns a list of measurements from the
|
|
|
|
// statement passed in or, if the statement is nil, a list of all
|
|
|
|
// measurement names from the database passed in.
|
|
|
|
func measurementsFromSourceOrDB(stmt influxql.Source, db *DatabaseIndex) (Measurements, error) {
|
|
|
|
var measurements Measurements
|
|
|
|
if stmt != nil {
|
|
|
|
// TODO: handle multiple measurement sources
|
|
|
|
if m, ok := stmt.(*influxql.Measurement); ok {
|
|
|
|
measurement := db.measurements[m.Name]
|
|
|
|
if measurement == nil {
|
|
|
|
return nil, ErrMeasurementNotFound(m.Name)
|
|
|
|
}
|
|
|
|
|
|
|
|
measurements = append(measurements, measurement)
|
|
|
|
} else {
|
|
|
|
return nil, errors.New("identifiers in FROM clause must be measurement names")
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// No measurements specified in FROM clause so get all measurements that have series.
|
|
|
|
for _, m := range db.Measurements() {
|
|
|
|
if len(m.seriesIDs) > 0 {
|
|
|
|
measurements = append(measurements, m)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sort.Sort(measurements)
|
|
|
|
|
|
|
|
return measurements, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// normalizeStatement adds a default database and policy to the measurements in statement.
|
|
|
|
func (q *QueryExecutor) normalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error) {
|
|
|
|
// Track prefixes for replacing field names.
|
|
|
|
prefixes := make(map[string]string)
|
|
|
|
|
|
|
|
// Qualify all measurements.
|
|
|
|
influxql.WalkFunc(stmt, func(n influxql.Node) {
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
switch n := n.(type) {
|
|
|
|
case *influxql.Measurement:
|
|
|
|
e := q.normalizeMeasurement(n, defaultDatabase)
|
|
|
|
if e != nil {
|
|
|
|
err = e
|
|
|
|
return
|
|
|
|
}
|
|
|
|
prefixes[n.Name] = n.Name
|
|
|
|
}
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Replace all variable references that used measurement prefixes.
|
|
|
|
influxql.WalkFunc(stmt, func(n influxql.Node) {
|
|
|
|
switch n := n.(type) {
|
|
|
|
case *influxql.VarRef:
|
|
|
|
for k, v := range prefixes {
|
|
|
|
if strings.HasPrefix(n.Val, k+".") {
|
|
|
|
n.Val = v + "." + influxql.QuoteIdent(n.Val[len(k)+1:])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// normalizeMeasurement inserts the default database or policy into all measurement names.
|
|
|
|
func (q *QueryExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error {
|
|
|
|
if defaultDatabase == "" {
|
|
|
|
return errors.New("no default database specified")
|
|
|
|
}
|
|
|
|
if m.Name == "" && m.Regex == nil {
|
|
|
|
return errors.New("invalid measurement")
|
|
|
|
}
|
|
|
|
|
|
|
|
if m.Database == "" {
|
|
|
|
m.Database = defaultDatabase
|
|
|
|
}
|
|
|
|
|
|
|
|
// Find database.
|
|
|
|
di, err := q.MetaStore.Database(m.Database)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
} else if di == nil {
|
|
|
|
return ErrDatabaseNotFound(m.Database)
|
|
|
|
}
|
|
|
|
|
|
|
|
// If no retention policy was specified, use the default.
|
|
|
|
if m.RetentionPolicy == "" {
|
|
|
|
if di.DefaultRetentionPolicy == "" {
|
|
|
|
return fmt.Errorf("default retention policy not set for: %s", di.Name)
|
|
|
|
}
|
|
|
|
m.RetentionPolicy = di.DefaultRetentionPolicy
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) *influxql.Result {
|
|
|
|
var rows []*influxql.Row
|
|
|
|
// Server stats.
|
|
|
|
serverRow := &influxql.Row{Columns: []string{}}
|
|
|
|
serverRow.Name = q.Stats.Name()
|
|
|
|
var values []interface{}
|
|
|
|
q.Stats.Walk(func(k string, v int64) {
|
|
|
|
serverRow.Columns = append(serverRow.Columns, k)
|
|
|
|
values = append(values, v)
|
|
|
|
})
|
|
|
|
serverRow.Values = append(serverRow.Values, values)
|
|
|
|
rows = append(rows, serverRow)
|
|
|
|
|
|
|
|
// Shard-level stats.
|
|
|
|
// TODO: wire up shard level stats
|
|
|
|
// for _, sh := range s.shards {
|
|
|
|
// if sh.store == nil {
|
|
|
|
// // No stats for non-local shards
|
|
|
|
// continue
|
|
|
|
// }
|
|
|
|
|
|
|
|
// row := &influxql.Row{Columns: []string{}}
|
|
|
|
// row.Name = sh.stats.Name()
|
|
|
|
// var values []interface{}
|
|
|
|
// sh.stats.Walk(func(k string, v int64) {
|
|
|
|
// row.Columns = append(row.Columns, k)
|
|
|
|
// values = append(values, v)
|
|
|
|
// })
|
|
|
|
// row.Values = append(row.Values, values)
|
|
|
|
// rows = append(rows, row)
|
|
|
|
// }
|
|
|
|
|
|
|
|
return &influxql.Result{Series: rows}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *QueryExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) *influxql.Result {
|
|
|
|
return &influxql.Result{Series: q.DiagnosticsAsRows()}
|
|
|
|
}
|
|
|
|
|
|
|
|
// DiagnosticsAsRows returns diagnostic information about the server, as a slice of
|
|
|
|
// InfluxQL rows.
|
|
|
|
func (q *QueryExecutor) DiagnosticsAsRows() []*influxql.Row {
|
|
|
|
panic("not yet implemented")
|
|
|
|
/*
|
|
|
|
s.mu.RLock()
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
now := time.Now().UTC()
|
|
|
|
|
|
|
|
// Common rows.
|
|
|
|
gd := NewGoDiagnostics()
|
|
|
|
sd := NewSystemDiagnostics()
|
|
|
|
md := NewMemoryDiagnostics()
|
|
|
|
bd := BuildDiagnostics{Version: s.Version, CommitHash: s.CommitHash}
|
|
|
|
|
|
|
|
// Common tagset.
|
|
|
|
tags := map[string]string{"serverID": strconv.FormatUint(s.id, 10)}
|
|
|
|
|
|
|
|
// Server row.
|
|
|
|
serverRow := &influxql.Row{
|
|
|
|
Name: "server_diag",
|
|
|
|
Columns: []string{"time", "startTime", "uptime", "id",
|
|
|
|
"path", "authEnabled", "index", "retentionAutoCreate", "numShards", "cqLastRun"},
|
|
|
|
Tags: tags,
|
|
|
|
Values: [][]interface{}{[]interface{}{now, startTime.String(), time.Since(startTime).String(), strconv.FormatUint(s.id, 10),
|
|
|
|
s.path, s.authenticationEnabled, int64(s.index), s.RetentionAutoCreate, len(s.shards), s.lastContinuousQueryRun.String()}},
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shard groups.
|
|
|
|
shardGroupsRow := &influxql.Row{Columns: []string{}}
|
|
|
|
shardGroupsRow.Name = "shardGroups_diag"
|
|
|
|
shardGroupsRow.Columns = append(shardGroupsRow.Columns, "time", "database", "retentionPolicy", "id",
|
|
|
|
"startTime", "endTime", "duration", "numShards")
|
|
|
|
shardGroupsRow.Tags = tags
|
|
|
|
// Check all shard groups.
|
|
|
|
for _, db := range s.databases {
|
|
|
|
for _, rp := range db.policies {
|
|
|
|
for _, g := range rp.shardGroups {
|
|
|
|
shardGroupsRow.Values = append(shardGroupsRow.Values, []interface{}{now, db.name, rp.Name,
|
|
|
|
strconv.FormatUint(g.ID, 10), g.StartTime.String(), g.EndTime.String(), g.Duration().String(), len(g.Shards)})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shards
|
|
|
|
shardsRow := &influxql.Row{Columns: []string{}}
|
|
|
|
shardsRow.Name = "shards_diag"
|
|
|
|
shardsRow.Columns = append(shardsRow.Columns, "time", "id", "dataNodes", "index", "path")
|
|
|
|
shardsRow.Tags = tags
|
|
|
|
for _, sh := range s.shards {
|
|
|
|
var nodes []string
|
|
|
|
for _, n := range sh.DataNodeIDs {
|
|
|
|
nodes = append(nodes, strconv.FormatUint(n, 10))
|
|
|
|
}
|
|
|
|
var path string
|
|
|
|
if sh.HasDataNodeID(s.id) {
|
|
|
|
path = sh.store.Path()
|
|
|
|
}
|
|
|
|
shardsRow.Values = append(shardsRow.Values, []interface{}{now, strconv.FormatUint(sh.ID, 10),
|
|
|
|
strings.Join(nodes, ","), strconv.FormatUint(sh.Index(), 10), path})
|
|
|
|
}
|
|
|
|
|
|
|
|
return []*influxql.Row{
|
|
|
|
gd.AsRow("server_go", tags),
|
|
|
|
sd.AsRow("server_system", tags),
|
|
|
|
md.AsRow("server_memory", tags),
|
|
|
|
bd.AsRow("server_build", tags),
|
|
|
|
serverRow,
|
|
|
|
shardGroupsRow,
|
|
|
|
shardsRow,
|
|
|
|
}
|
|
|
|
*/
|
|
|
|
}
|
|
|
|
|
|
|
|
// ErrAuthorize represents an authorization error.
|
|
|
|
type ErrAuthorize struct {
|
|
|
|
text string
|
|
|
|
}
|
|
|
|
|
|
|
|
// Error returns the text of the error.
|
|
|
|
func (e ErrAuthorize) Error() string {
|
|
|
|
return e.text
|
|
|
|
}
|
|
|
|
|
|
|
|
// authorize satisfies isAuthorizationError
|
|
|
|
func (ErrAuthorize) authorize() {}
|
|
|
|
|
|
|
|
var (
|
|
|
|
// ErrInvalidQuery is returned when executing an unknown query type.
|
|
|
|
ErrInvalidQuery = errors.New("invalid query")
|
|
|
|
|
|
|
|
// ErrNotExecuted is returned when a statement is not executed in a query.
|
|
|
|
// This can occur when a previous statement in the same query has errored.
|
|
|
|
ErrNotExecuted = errors.New("not executed")
|
|
|
|
)
|
|
|
|
|
|
|
|
func ErrDatabaseNotFound(name string) error { return Errorf("database not found: %s", name) }
|
|
|
|
|
|
|
|
func ErrMeasurementNotFound(name string) error { return Errorf("measurement not found: %s", name) }
|
|
|
|
|
|
|
|
func Errorf(format string, a ...interface{}) (err error) {
|
|
|
|
if _, file, line, ok := runtime.Caller(2); ok {
|
|
|
|
a = append(a, file, line)
|
|
|
|
err = fmt.Errorf(format+" (%s:%d)", a...)
|
|
|
|
} else {
|
|
|
|
err = fmt.Errorf(format, a...)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|