863 lines
25 KiB
Go
863 lines
25 KiB
Go
package coordinator
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/authorizer"
|
|
iql "github.com/influxdata/influxdb/v2/influxql"
|
|
"github.com/influxdata/influxdb/v2/influxql/query"
|
|
errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
|
"github.com/influxdata/influxdb/v2/models"
|
|
"github.com/influxdata/influxdb/v2/pkg/tracing"
|
|
"github.com/influxdata/influxdb/v2/pkg/tracing/fields"
|
|
"github.com/influxdata/influxdb/v2/tsdb"
|
|
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
|
"github.com/influxdata/influxql"
|
|
)
|
|
|
|
// ErrDatabaseNameRequired is returned when executing statements that require a database,
|
|
// when a database has not been provided.
|
|
var ErrDatabaseNameRequired = errors.New("database name required")
|
|
|
|
// StatementExecutor executes a statement in the query.
|
|
type StatementExecutor struct {
|
|
MetaClient MetaClient
|
|
|
|
// TSDB storage for local node.
|
|
TSDBStore TSDBStore
|
|
|
|
// ShardMapper for mapping shards when executing a SELECT statement.
|
|
ShardMapper query.ShardMapper
|
|
|
|
DBRP influxdb.DBRPMappingService
|
|
|
|
// Select statement limits
|
|
MaxSelectPointN int
|
|
MaxSelectSeriesN int
|
|
MaxSelectBucketsN int
|
|
}
|
|
|
|
// ExecuteStatement executes the given statement with the given execution context.
|
|
func (e *StatementExecutor) ExecuteStatement(ctx context.Context, stmt influxql.Statement, ectx *query.ExecutionContext) error {
|
|
// Select statements are handled separately so that they can be streamed.
|
|
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
|
|
return e.executeSelectStatement(ctx, stmt, ectx)
|
|
}
|
|
|
|
var rows models.Rows
|
|
var messages []*query.Message
|
|
var err error
|
|
switch stmt := stmt.(type) {
|
|
case *influxql.AlterRetentionPolicyStatement:
|
|
err = iql.ErrNotImplemented("ALTER RETENTION POLICY")
|
|
case *influxql.CreateContinuousQueryStatement:
|
|
err = iql.ErrNotImplemented("CREATE CONTINUOUS QUERY")
|
|
case *influxql.CreateDatabaseStatement:
|
|
err = iql.ErrNotImplemented("CREATE DATABASE")
|
|
case *influxql.CreateRetentionPolicyStatement:
|
|
err = iql.ErrNotImplemented("CREATE RETENTION POLICY")
|
|
case *influxql.CreateSubscriptionStatement:
|
|
err = iql.ErrNotImplemented("CREATE SUBSCRIPTION")
|
|
case *influxql.CreateUserStatement:
|
|
err = iql.ErrNotImplemented("CREATE USER")
|
|
case *influxql.DeleteSeriesStatement:
|
|
return e.executeDeleteSeriesStatement(ctx, stmt, ectx.Database, ectx)
|
|
case *influxql.DropContinuousQueryStatement:
|
|
err = iql.ErrNotImplemented("DROP CONTINUOUS QUERY")
|
|
case *influxql.DropDatabaseStatement:
|
|
err = iql.ErrNotImplemented("DROP DATABASE")
|
|
case *influxql.DropMeasurementStatement:
|
|
return e.executeDropMeasurementStatement(ctx, stmt, ectx.Database, ectx)
|
|
case *influxql.DropSeriesStatement:
|
|
err = iql.ErrNotImplemented("DROP SERIES")
|
|
case *influxql.DropRetentionPolicyStatement:
|
|
err = iql.ErrNotImplemented("DROP RETENTION POLICY")
|
|
case *influxql.DropShardStatement:
|
|
err = iql.ErrNotImplemented("DROP SHARD")
|
|
case *influxql.DropSubscriptionStatement:
|
|
err = iql.ErrNotImplemented("DROP SUBSCRIPTION")
|
|
case *influxql.DropUserStatement:
|
|
err = iql.ErrNotImplemented("DROP USER")
|
|
case *influxql.ExplainStatement:
|
|
if stmt.Analyze {
|
|
rows, err = e.executeExplainAnalyzeStatement(ctx, stmt, ectx)
|
|
} else {
|
|
rows, err = e.executeExplainStatement(ctx, stmt, ectx)
|
|
}
|
|
case *influxql.GrantStatement:
|
|
err = iql.ErrNotImplemented("GRANT")
|
|
case *influxql.GrantAdminStatement:
|
|
err = iql.ErrNotImplemented("GRANT ALL")
|
|
case *influxql.RevokeStatement:
|
|
err = iql.ErrNotImplemented("REVOKE")
|
|
case *influxql.RevokeAdminStatement:
|
|
err = iql.ErrNotImplemented("REVOKE ALL")
|
|
case *influxql.ShowContinuousQueriesStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW CONTINUOUS QUERIES")
|
|
case *influxql.ShowDatabasesStatement:
|
|
rows, err = e.executeShowDatabasesStatement(ctx, stmt, ectx)
|
|
case *influxql.ShowDiagnosticsStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW DIAGNOSTICS")
|
|
case *influxql.ShowGrantsForUserStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW GRANTS")
|
|
case *influxql.ShowMeasurementsStatement:
|
|
return e.executeShowMeasurementsStatement(ctx, stmt, ectx)
|
|
case *influxql.ShowMeasurementCardinalityStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW MEASUREMENT CARDINALITY")
|
|
case *influxql.ShowRetentionPoliciesStatement:
|
|
rows, err = e.executeShowRetentionPoliciesStatement(ctx, stmt, ectx)
|
|
case *influxql.ShowSeriesCardinalityStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW SERIES CARDINALITY")
|
|
case *influxql.ShowShardsStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW SHARDS")
|
|
case *influxql.ShowShardGroupsStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW SHARD GROUPS")
|
|
case *influxql.ShowStatsStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW STATS")
|
|
case *influxql.ShowSubscriptionsStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW SUBSCRIPTIONS")
|
|
case *influxql.ShowTagKeysStatement:
|
|
return e.executeShowTagKeys(ctx, stmt, ectx)
|
|
case *influxql.ShowTagValuesStatement:
|
|
return e.executeShowTagValues(ctx, stmt, ectx)
|
|
case *influxql.ShowUsersStatement:
|
|
rows, err = nil, iql.ErrNotImplemented("SHOW USERS")
|
|
case *influxql.SetPasswordUserStatement:
|
|
err = iql.ErrNotImplemented("SET PASSWORD")
|
|
case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement:
|
|
err = iql.ErrNotImplemented("SHOW QUERIES")
|
|
default:
|
|
return query.ErrInvalidQuery
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return ectx.Send(ctx, &query.Result{
|
|
Series: rows,
|
|
Messages: messages,
|
|
})
|
|
}
|
|
|
|
func (e *StatementExecutor) executeExplainStatement(ctx context.Context, q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
|
|
opt := query.SelectOptions{
|
|
OrgID: ectx.OrgID,
|
|
NodeID: ectx.ExecutionOptions.NodeID,
|
|
MaxSeriesN: e.MaxSelectSeriesN,
|
|
MaxBucketsN: e.MaxSelectBucketsN,
|
|
}
|
|
|
|
// Prepare the query for execution, but do not actually execute it.
|
|
// This should perform any needed substitutions.
|
|
p, err := query.Prepare(ctx, q.Statement, e.ShardMapper, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer p.Close()
|
|
|
|
plan, err := p.Explain(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
plan = strings.TrimSpace(plan)
|
|
|
|
row := &models.Row{
|
|
Columns: []string{"QUERY PLAN"},
|
|
}
|
|
for _, s := range strings.Split(plan, "\n") {
|
|
row.Values = append(row.Values, []interface{}{s})
|
|
}
|
|
return models.Rows{row}, nil
|
|
}
|
|
|
|
func (e *StatementExecutor) executeExplainAnalyzeStatement(ctx context.Context, q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
|
|
stmt := q.Statement
|
|
t, span := tracing.NewTrace("select")
|
|
ctx = tracing.NewContextWithTrace(ctx, t)
|
|
ctx = tracing.NewContextWithSpan(ctx, span)
|
|
var aux query.Iterators
|
|
ctx = query.NewContextWithIterators(ctx, &aux)
|
|
start := time.Now()
|
|
|
|
cur, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions, ectx.StatisticsGatherer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
iterTime := time.Since(start)
|
|
|
|
// Generate a row emitter from the iterator set.
|
|
em := query.NewEmitter(cur, ectx.ChunkSize)
|
|
|
|
// Emit rows to the results channel.
|
|
var writeN int64
|
|
for {
|
|
var row *models.Row
|
|
row, _, err = em.Emit()
|
|
if err != nil {
|
|
goto CLEANUP
|
|
} else if row == nil {
|
|
// Check if the query was interrupted while emitting.
|
|
if err = ctx.Err(); err != nil {
|
|
goto CLEANUP
|
|
}
|
|
break
|
|
}
|
|
|
|
writeN += int64(len(row.Values))
|
|
}
|
|
|
|
CLEANUP:
|
|
em.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// close auxiliary iterators deterministically to finalize any captured measurements
|
|
aux.Close()
|
|
|
|
totalTime := time.Since(start)
|
|
span.MergeFields(
|
|
fields.Duration("total_time", totalTime),
|
|
fields.Duration("planning_time", iterTime),
|
|
fields.Duration("execution_time", totalTime-iterTime),
|
|
)
|
|
span.Finish()
|
|
|
|
row := &models.Row{
|
|
Columns: []string{"EXPLAIN ANALYZE"},
|
|
}
|
|
for _, s := range strings.Split(t.Tree().String(), "\n") {
|
|
row.Values = append(row.Values, []interface{}{s})
|
|
}
|
|
|
|
return models.Rows{row}, nil
|
|
}
|
|
|
|
func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) error {
|
|
cur, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions, ectx.StatisticsGatherer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Generate a row emitter from the iterator set.
|
|
em := query.NewEmitter(cur, ectx.ChunkSize)
|
|
defer em.Close()
|
|
|
|
// Emit rows to the results channel.
|
|
var emitted bool
|
|
|
|
if stmt.Target != nil {
|
|
// SELECT INTO is unsupported
|
|
return iql.ErrNotImplemented("SELECT INTO")
|
|
}
|
|
|
|
for {
|
|
row, partial, err := em.Emit()
|
|
if err != nil {
|
|
return err
|
|
} else if row == nil {
|
|
// Check if the query was interrupted while emitting.
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
break
|
|
}
|
|
|
|
result := &query.Result{
|
|
Series: []*models.Row{row},
|
|
Partial: partial,
|
|
}
|
|
|
|
// Send results or exit if closing.
|
|
if err := ectx.Send(ctx, result); err != nil {
|
|
return err
|
|
}
|
|
|
|
emitted = true
|
|
}
|
|
|
|
// Always emit at least one result.
|
|
if !emitted {
|
|
return ectx.Send(ctx, &query.Result{
|
|
Series: make([]*models.Row, 0),
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions, gatherer *iql.StatisticsGatherer) (query.Cursor, error) {
|
|
defer func(start time.Time) {
|
|
dur := time.Since(start)
|
|
gatherer.Append(iql.NewImmutableCollector(iql.Statistics{PlanDuration: dur}))
|
|
}(time.Now())
|
|
|
|
sopt := query.SelectOptions{
|
|
OrgID: opt.OrgID,
|
|
NodeID: opt.NodeID,
|
|
MaxSeriesN: e.MaxSelectSeriesN,
|
|
MaxPointN: e.MaxSelectPointN,
|
|
MaxBucketsN: e.MaxSelectBucketsN,
|
|
StatisticsGatherer: gatherer,
|
|
}
|
|
|
|
// Create a set of iterators from a selection.
|
|
cur, err := query.Select(ctx, stmt, e.ShardMapper, sopt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cur, nil
|
|
}
|
|
|
|
func (e *StatementExecutor) executeShowDatabasesStatement(ctx context.Context, q *influxql.ShowDatabasesStatement, ectx *query.ExecutionContext) (models.Rows, error) {
|
|
row := &models.Row{Name: "databases", Columns: []string{"name"}}
|
|
dbrps, _, err := e.DBRP.FindMany(ctx, influxdb.DBRPMappingFilter{
|
|
OrgID: &ectx.OrgID,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
seenDbs := make(map[string]struct{}, len(dbrps))
|
|
for _, dbrp := range dbrps {
|
|
if _, ok := seenDbs[dbrp.Database]; ok {
|
|
continue
|
|
}
|
|
|
|
perm, err := influxdb.NewPermissionAtID(dbrp.BucketID, influxdb.ReadAction, influxdb.BucketsResourceType, dbrp.OrganizationID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = authorizer.IsAllowed(ctx, *perm)
|
|
if err != nil {
|
|
if errors2.ErrorCode(err) == errors2.EUnauthorized {
|
|
continue
|
|
}
|
|
return nil, err
|
|
}
|
|
seenDbs[dbrp.Database] = struct{}{}
|
|
row.Values = append(row.Values, []interface{}{dbrp.Database})
|
|
}
|
|
return []*models.Row{row}, nil
|
|
}
|
|
|
|
func (e *StatementExecutor) getDefaultRP(ctx context.Context, database string, ectx *query.ExecutionContext) (*influxdb.DBRPMapping, error) {
|
|
defaultRP := true
|
|
mappings, n, err := e.DBRP.FindMany(ctx, influxdb.DBRPMappingFilter{
|
|
OrgID: &ectx.OrgID,
|
|
Database: &database,
|
|
Default: &defaultRP,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("finding DBRP mappings: %v", err)
|
|
} else if n == 0 {
|
|
return nil, fmt.Errorf("default retention policy not set for: %s", database)
|
|
} else if n != 1 {
|
|
return nil, fmt.Errorf("finding DBRP mappings: expected 1, found %d", n)
|
|
}
|
|
return mappings[0], nil
|
|
}
|
|
|
|
func (e *StatementExecutor) executeDeleteSeriesStatement(ctx context.Context, q *influxql.DeleteSeriesStatement, database string, ectx *query.ExecutionContext) error {
|
|
mapping, err := e.getDefaultRP(ctx, database, ectx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Require write for DELETE queries
|
|
_, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID)
|
|
if err != nil {
|
|
return ectx.Send(ctx, &query.Result{
|
|
Err: fmt.Errorf("insufficient permissions"),
|
|
})
|
|
}
|
|
|
|
// Convert "now()" to current time.
|
|
q.Condition = influxql.Reduce(q.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
|
|
|
|
return e.TSDBStore.DeleteSeries(ctx, mapping.BucketID.String(), q.Sources, q.Condition)
|
|
}
|
|
|
|
func (e *StatementExecutor) executeDropMeasurementStatement(ctx context.Context, q *influxql.DropMeasurementStatement, database string, ectx *query.ExecutionContext) error {
|
|
mapping, err := e.getDefaultRP(ctx, database, ectx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Require write for DROP MEASUREMENT queries
|
|
_, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID)
|
|
if err != nil {
|
|
return ectx.Send(ctx, &query.Result{
|
|
Err: fmt.Errorf("insufficient permissions"),
|
|
})
|
|
}
|
|
|
|
return e.TSDBStore.DeleteMeasurement(ctx, mapping.BucketID.String(), q.Name)
|
|
}
|
|
|
|
type measurementRow struct {
|
|
name []byte
|
|
db, rp string
|
|
}
|
|
|
|
func (e *StatementExecutor) executeShowMeasurementsStatement(ctx context.Context, q *influxql.ShowMeasurementsStatement, ectx *query.ExecutionContext) error {
|
|
if q.Database == "" && !q.WildcardDatabase {
|
|
return ErrDatabaseNameRequired
|
|
}
|
|
|
|
if q.WildcardDatabase {
|
|
// We could support this but it doesn't seem very useful.
|
|
if q.RetentionPolicy != "" {
|
|
return ectx.Send(ctx, &query.Result{
|
|
Err: fmt.Errorf("query 'SHOW MEASUREMENTS ON *.rp' not supported. use 'ON *.*' or specify a database"),
|
|
})
|
|
}
|
|
// It is not clear how '*' should interact with the default retention policy, so reject it
|
|
if !q.WildcardRetentionPolicy {
|
|
return ectx.Send(ctx, &query.Result{
|
|
Err: fmt.Errorf("query 'SHOW MEASUREMENTS ON *' not supported. use 'ON *.*' or specify a database"),
|
|
})
|
|
}
|
|
}
|
|
|
|
onlyPrintMeasurements := !(q.WildcardDatabase || q.WildcardRetentionPolicy || q.RetentionPolicy != "")
|
|
|
|
mappingsFilter := influxdb.DBRPMappingFilter{
|
|
OrgID: &ectx.OrgID,
|
|
}
|
|
|
|
if !q.WildcardDatabase {
|
|
mappingsFilter.Database = &q.Database
|
|
}
|
|
if !q.WildcardRetentionPolicy {
|
|
if q.RetentionPolicy == "" {
|
|
defaultRP := true
|
|
mappingsFilter.Default = &defaultRP
|
|
} else {
|
|
mappingsFilter.RetentionPolicy = &q.RetentionPolicy
|
|
}
|
|
}
|
|
mappings, _, err := e.DBRP.FindMany(ctx, mappingsFilter)
|
|
if err != nil {
|
|
return fmt.Errorf("finding DBRP mappings: %v", err)
|
|
}
|
|
|
|
rows := make([]measurementRow, 0)
|
|
|
|
// Sort the sources for consistent output
|
|
sort.Slice(mappings, func(i, j int) bool {
|
|
if mappings[i].Database != mappings[j].Database {
|
|
return mappings[i].Database < mappings[j].Database
|
|
}
|
|
return mappings[i].RetentionPolicy < mappings[j].RetentionPolicy
|
|
})
|
|
|
|
for _, mapping := range mappings {
|
|
names, err := e.TSDBStore.MeasurementNames(ctx, ectx.Authorizer, mapping.BucketID.String(), q.Condition)
|
|
if err != nil {
|
|
return ectx.Send(ctx, &query.Result{
|
|
Err: err,
|
|
})
|
|
}
|
|
for _, name := range names {
|
|
rows = append(rows, measurementRow{
|
|
name: name,
|
|
db: mapping.Database,
|
|
rp: mapping.RetentionPolicy,
|
|
})
|
|
}
|
|
}
|
|
|
|
if q.Offset > 0 {
|
|
if q.Offset >= len(rows) {
|
|
rows = nil
|
|
} else {
|
|
rows = rows[q.Offset:]
|
|
}
|
|
}
|
|
|
|
if q.Limit > 0 {
|
|
if q.Limit < len(rows) {
|
|
rows = rows[:q.Limit]
|
|
}
|
|
}
|
|
|
|
if len(rows) == 0 {
|
|
return ectx.Send(ctx, &query.Result{})
|
|
}
|
|
|
|
if onlyPrintMeasurements {
|
|
values := make([][]interface{}, len(rows))
|
|
for i, r := range rows {
|
|
values[i] = []interface{}{string(r.name)}
|
|
}
|
|
|
|
return ectx.Send(ctx, &query.Result{
|
|
Series: []*models.Row{{
|
|
Name: "measurements",
|
|
Columns: []string{"name"},
|
|
Values: values,
|
|
}},
|
|
})
|
|
}
|
|
|
|
values := make([][]interface{}, len(rows))
|
|
for i, r := range rows {
|
|
values[i] = []interface{}{string(r.name), r.db, r.rp}
|
|
}
|
|
|
|
return ectx.Send(ctx, &query.Result{
|
|
Series: []*models.Row{{
|
|
Name: "measurements",
|
|
Columns: []string{"name", "database", "retention policy"},
|
|
Values: values,
|
|
}},
|
|
})
|
|
|
|
}
|
|
|
|
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(ctx context.Context, q *influxql.ShowRetentionPoliciesStatement, ectx *query.ExecutionContext) (models.Rows, error) {
|
|
if q.Database == "" {
|
|
return nil, ErrDatabaseNameRequired
|
|
}
|
|
|
|
dbrps, _, err := e.DBRP.FindMany(ctx, influxdb.DBRPMappingFilter{
|
|
OrgID: &ectx.OrgID,
|
|
Database: &q.Database,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}}
|
|
for _, dbrp := range dbrps {
|
|
perm, err := influxdb.NewPermissionAtID(dbrp.BucketID, influxdb.ReadAction, influxdb.BucketsResourceType, dbrp.OrganizationID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = authorizer.IsAllowed(ctx, *perm)
|
|
if err != nil {
|
|
if errors2.ErrorCode(err) == errors2.EUnauthorized {
|
|
continue
|
|
}
|
|
return nil, err
|
|
}
|
|
row.Values = append(row.Values, []interface{}{dbrp.RetentionPolicy, "0s", "168h0m0s", 1, dbrp.Default})
|
|
}
|
|
|
|
return []*models.Row{row}, nil
|
|
}
|
|
|
|
func (e *StatementExecutor) executeShowTagKeys(ctx context.Context, q *influxql.ShowTagKeysStatement, ectx *query.ExecutionContext) error {
|
|
if q.Database == "" {
|
|
return ErrDatabaseNameRequired
|
|
}
|
|
|
|
mapping, err := e.getDefaultRP(ctx, q.Database, ectx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Determine shard set based on database and time range.
|
|
// SHOW TAG KEYS returns all tag keys for the default retention policy.
|
|
di := e.MetaClient.Database(mapping.BucketID.String())
|
|
if di == nil {
|
|
return fmt.Errorf("database not found: %s", q.Database)
|
|
}
|
|
|
|
// Determine appropriate time range. If one or fewer time boundaries provided
|
|
// then min/max possible time should be used instead.
|
|
valuer := &influxql.NowValuer{Now: time.Now()}
|
|
cond, timeRange, err := influxql.ConditionExpr(q.Condition, valuer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get all shards for all retention policies.
|
|
var allGroups []meta.ShardGroupInfo
|
|
for _, rpi := range di.RetentionPolicies {
|
|
sgis, err := e.MetaClient.ShardGroupsByTimeRange(mapping.BucketID.String(), rpi.Name, timeRange.MinTime(), timeRange.MaxTime())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
allGroups = append(allGroups, sgis...)
|
|
}
|
|
|
|
var shardIDs []uint64
|
|
for _, sgi := range allGroups {
|
|
for _, si := range sgi.Shards {
|
|
shardIDs = append(shardIDs, si.ID)
|
|
}
|
|
}
|
|
|
|
tagKeys, err := e.TSDBStore.TagKeys(ctx, ectx.Authorizer, shardIDs, cond)
|
|
if err != nil {
|
|
return ectx.Send(ctx, &query.Result{
|
|
Err: err,
|
|
})
|
|
}
|
|
|
|
emitted := false
|
|
for _, m := range tagKeys {
|
|
keys := m.Keys
|
|
|
|
if q.Offset > 0 {
|
|
if q.Offset >= len(keys) {
|
|
keys = nil
|
|
} else {
|
|
keys = keys[q.Offset:]
|
|
}
|
|
}
|
|
if q.Limit > 0 && q.Limit < len(keys) {
|
|
keys = keys[:q.Limit]
|
|
}
|
|
|
|
if len(keys) == 0 {
|
|
continue
|
|
}
|
|
|
|
row := &models.Row{
|
|
Name: m.Measurement,
|
|
Columns: []string{"tagKey"},
|
|
Values: make([][]interface{}, len(keys)),
|
|
}
|
|
for i, key := range keys {
|
|
row.Values[i] = []interface{}{key}
|
|
}
|
|
|
|
if err := ectx.Send(ctx, &query.Result{
|
|
Series: []*models.Row{row},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
emitted = true
|
|
}
|
|
|
|
// Ensure at least one result is emitted.
|
|
if !emitted {
|
|
return ectx.Send(ctx, &query.Result{})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *StatementExecutor) executeShowTagValues(ctx context.Context, q *influxql.ShowTagValuesStatement, ectx *query.ExecutionContext) error {
|
|
if q.Database == "" {
|
|
return ErrDatabaseNameRequired
|
|
}
|
|
|
|
mapping, err := e.getDefaultRP(ctx, q.Database, ectx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Determine shard set based on database and time range.
|
|
// SHOW TAG VALUES returns all tag values for the default retention policy.
|
|
di := e.MetaClient.Database(mapping.BucketID.String())
|
|
if di == nil {
|
|
return fmt.Errorf("database not found: %s", q.Database)
|
|
}
|
|
|
|
// Determine appropriate time range. If one or fewer time boundaries provided
|
|
// then min/max possible time should be used instead.
|
|
valuer := &influxql.NowValuer{Now: time.Now()}
|
|
cond, timeRange, err := influxql.ConditionExpr(q.Condition, valuer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get all shards for all retention policies.
|
|
var allGroups []meta.ShardGroupInfo
|
|
for _, rpi := range di.RetentionPolicies {
|
|
sgis, err := e.MetaClient.ShardGroupsByTimeRange(mapping.BucketID.String(), rpi.Name, timeRange.MinTime(), timeRange.MaxTime())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
allGroups = append(allGroups, sgis...)
|
|
}
|
|
|
|
var shardIDs []uint64
|
|
for _, sgi := range allGroups {
|
|
for _, si := range sgi.Shards {
|
|
shardIDs = append(shardIDs, si.ID)
|
|
}
|
|
}
|
|
|
|
tagValues, err := e.TSDBStore.TagValues(ctx, ectx.Authorizer, shardIDs, cond)
|
|
if err != nil {
|
|
return ectx.Send(ctx, &query.Result{Err: err})
|
|
}
|
|
|
|
emitted := false
|
|
for _, m := range tagValues {
|
|
values := m.Values
|
|
|
|
if q.Offset > 0 {
|
|
if q.Offset >= len(values) {
|
|
values = nil
|
|
} else {
|
|
values = values[q.Offset:]
|
|
}
|
|
}
|
|
|
|
if q.Limit > 0 {
|
|
if q.Limit < len(values) {
|
|
values = values[:q.Limit]
|
|
}
|
|
}
|
|
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
|
|
row := &models.Row{
|
|
Name: m.Measurement,
|
|
Columns: []string{"key", "value"},
|
|
Values: make([][]interface{}, len(values)),
|
|
}
|
|
for i, v := range values {
|
|
row.Values[i] = []interface{}{v.Key, v.Value}
|
|
}
|
|
|
|
if err := ectx.Send(ctx, &query.Result{
|
|
Series: []*models.Row{row},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
emitted = true
|
|
}
|
|
|
|
// Ensure at least one result is emitted.
|
|
if !emitted {
|
|
return ectx.Send(ctx, &query.Result{})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NormalizeStatement adds a default database and policy to the measurements in statement.
|
|
// Parameter defaultRetentionPolicy can be "".
|
|
func (e *StatementExecutor) NormalizeStatement(ctx context.Context, stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string, ectx *query.ExecutionContext) (err error) {
|
|
influxql.WalkFunc(stmt, func(node influxql.Node) {
|
|
if err != nil {
|
|
return
|
|
}
|
|
switch node := node.(type) {
|
|
case *influxql.ShowRetentionPoliciesStatement:
|
|
if node.Database == "" {
|
|
node.Database = defaultDatabase
|
|
}
|
|
case *influxql.ShowMeasurementsStatement:
|
|
if node.Database == "" {
|
|
node.Database = defaultDatabase
|
|
}
|
|
case *influxql.ShowTagKeysStatement:
|
|
if node.Database == "" {
|
|
node.Database = defaultDatabase
|
|
}
|
|
case *influxql.ShowTagValuesStatement:
|
|
if node.Database == "" {
|
|
node.Database = defaultDatabase
|
|
}
|
|
case *influxql.ShowMeasurementCardinalityStatement:
|
|
if node.Database == "" {
|
|
node.Database = defaultDatabase
|
|
}
|
|
case *influxql.ShowSeriesCardinalityStatement:
|
|
if node.Database == "" {
|
|
node.Database = defaultDatabase
|
|
}
|
|
case *influxql.Measurement:
|
|
switch stmt.(type) {
|
|
case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement:
|
|
// DB and RP not supported by these statements so don't rewrite into invalid
|
|
// statements
|
|
default:
|
|
err = e.normalizeMeasurement(ctx, node, defaultDatabase, defaultRetentionPolicy, ectx)
|
|
}
|
|
}
|
|
})
|
|
return
|
|
}
|
|
|
|
func (e *StatementExecutor) normalizeMeasurement(ctx context.Context, m *influxql.Measurement, defaultDatabase, defaultRetentionPolicy string, ectx *query.ExecutionContext) error {
|
|
// Targets (measurements in an INTO clause) can have blank names, which means it will be
|
|
// the same as the measurement name it came from in the FROM clause.
|
|
if !m.IsTarget && m.Name == "" && m.SystemIterator == "" && m.Regex == nil {
|
|
return errors.New("invalid measurement")
|
|
}
|
|
|
|
// Measurement does not have an explicit database? Insert default.
|
|
if m.Database == "" {
|
|
m.Database = defaultDatabase
|
|
}
|
|
|
|
// The database must now be specified by this point.
|
|
if m.Database == "" {
|
|
return ErrDatabaseNameRequired
|
|
}
|
|
|
|
// TODO(sgc): Validate database; fetch default RP
|
|
filter := influxdb.DBRPMappingFilter{
|
|
OrgID: &ectx.OrgID,
|
|
Database: &m.Database,
|
|
}
|
|
|
|
res, _, err := e.DBRP.FindMany(ctx, filter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(res) == 0 {
|
|
return query.ErrDatabaseNotFound(m.Database)
|
|
}
|
|
|
|
// If no retention policy was specified, use the default.
|
|
if m.RetentionPolicy == "" {
|
|
if defaultRetentionPolicy != "" {
|
|
m.RetentionPolicy = defaultRetentionPolicy
|
|
} else if rp := mappings(res).DefaultRetentionPolicy(m.Database); rp != "" {
|
|
m.RetentionPolicy = rp
|
|
} else {
|
|
return fmt.Errorf("default retention policy not set for: %s", m.Database)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type mappings []*influxdb.DBRPMapping
|
|
|
|
func (m mappings) DefaultRetentionPolicy(db string) string {
|
|
for _, v := range m {
|
|
if v.Database == db && v.Default {
|
|
return v.RetentionPolicy
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// TSDBStore is an interface for accessing the time series data store.
|
|
type TSDBStore interface {
|
|
DeleteMeasurement(ctx context.Context, database, name string) error
|
|
DeleteSeries(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error
|
|
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
|
|
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
|
|
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
|
|
}
|
|
|
|
var _ TSDBStore = LocalTSDBStore{}
|
|
|
|
// LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator
|
|
// to satisfy the TSDBStore interface.
|
|
type LocalTSDBStore struct {
|
|
*tsdb.Store
|
|
}
|