2016-05-11 16:32:56 +00:00
|
|
|
package coordinator
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
2016-04-29 00:29:09 +00:00
|
|
|
"io"
|
2016-02-12 22:10:02 +00:00
|
|
|
"sort"
|
|
|
|
"strconv"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/influxdata/influxdb"
|
|
|
|
"github.com/influxdata/influxdb/influxql"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
|
|
"github.com/influxdata/influxdb/monitor"
|
|
|
|
"github.com/influxdata/influxdb/services/meta"
|
2016-04-05 19:33:44 +00:00
|
|
|
"github.com/influxdata/influxdb/tsdb"
|
2016-02-12 22:10:02 +00:00
|
|
|
)
|
|
|
|
|
2016-07-28 22:38:08 +00:00
|
|
|
var ErrDatabaseNameRequired = errors.New("database name required")
|
|
|
|
|
2016-05-26 16:32:56 +00:00
|
|
|
type pointsWriter interface {
|
|
|
|
WritePointsInto(*IntoWriteRequest) error
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
// StatementExecutor executes a statement in the query.
|
|
|
|
type StatementExecutor struct {
|
2016-02-19 20:38:02 +00:00
|
|
|
MetaClient MetaClient
|
2016-02-12 22:10:02 +00:00
|
|
|
|
2016-06-10 17:30:49 +00:00
|
|
|
// TaskManager holds the StatementExecutor that handles task-related commands.
|
|
|
|
TaskManager influxql.StatementExecutor
|
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
// TSDB storage for local node.
|
2016-02-19 20:38:02 +00:00
|
|
|
TSDBStore TSDBStore
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
// Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS.
|
|
|
|
Monitor *monitor.Monitor
|
|
|
|
|
|
|
|
// Used for rewriting points back into system for SELECT INTO statements.
|
2016-05-26 16:32:56 +00:00
|
|
|
PointsWriter pointsWriter
|
2016-03-21 16:00:07 +00:00
|
|
|
|
2016-03-23 15:05:38 +00:00
|
|
|
// Select statement limits
|
2016-03-31 01:00:29 +00:00
|
|
|
MaxSelectPointN int
|
|
|
|
MaxSelectSeriesN int
|
|
|
|
MaxSelectBucketsN int
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:33 +00:00
|
|
|
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
|
2016-03-31 22:12:29 +00:00
|
|
|
// Select statements are handled separately so that they can be streamed.
|
|
|
|
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
|
2016-06-07 17:28:33 +00:00
|
|
|
return e.executeSelectStatement(stmt, &ctx)
|
2016-03-08 18:54:32 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
var rows models.Rows
|
2016-04-13 14:44:59 +00:00
|
|
|
var messages []*influxql.Message
|
2016-03-31 22:12:29 +00:00
|
|
|
var err error
|
|
|
|
switch stmt := stmt.(type) {
|
|
|
|
case *influxql.AlterRetentionPolicyStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeAlterRetentionPolicyStatement(stmt)
|
|
|
|
case *influxql.CreateContinuousQueryStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeCreateContinuousQueryStatement(stmt)
|
|
|
|
case *influxql.CreateDatabaseStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeCreateDatabaseStatement(stmt)
|
|
|
|
case *influxql.CreateRetentionPolicyStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeCreateRetentionPolicyStatement(stmt)
|
|
|
|
case *influxql.CreateSubscriptionStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeCreateSubscriptionStatement(stmt)
|
|
|
|
case *influxql.CreateUserStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeCreateUserStatement(stmt)
|
2016-04-26 21:43:10 +00:00
|
|
|
case *influxql.DeleteSeriesStatement:
|
|
|
|
err = e.executeDeleteSeriesStatement(stmt, ctx.Database)
|
2016-03-31 22:12:29 +00:00
|
|
|
case *influxql.DropContinuousQueryStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropContinuousQueryStatement(stmt)
|
|
|
|
case *influxql.DropDatabaseStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropDatabaseStatement(stmt)
|
|
|
|
case *influxql.DropMeasurementStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropMeasurementStatement(stmt, ctx.Database)
|
|
|
|
case *influxql.DropSeriesStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropSeriesStatement(stmt, ctx.Database)
|
|
|
|
case *influxql.DropRetentionPolicyStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropRetentionPolicyStatement(stmt)
|
|
|
|
case *influxql.DropShardStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropShardStatement(stmt)
|
|
|
|
case *influxql.DropSubscriptionStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropSubscriptionStatement(stmt)
|
|
|
|
case *influxql.DropUserStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeDropUserStatement(stmt)
|
|
|
|
case *influxql.GrantStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeGrantStatement(stmt)
|
|
|
|
case *influxql.GrantAdminStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeGrantAdminStatement(stmt)
|
|
|
|
case *influxql.RevokeStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeRevokeStatement(stmt)
|
|
|
|
case *influxql.RevokeAdminStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeRevokeAdminStatement(stmt)
|
|
|
|
case *influxql.ShowContinuousQueriesStatement:
|
|
|
|
rows, err = e.executeShowContinuousQueriesStatement(stmt)
|
|
|
|
case *influxql.ShowDatabasesStatement:
|
|
|
|
rows, err = e.executeShowDatabasesStatement(stmt)
|
|
|
|
case *influxql.ShowDiagnosticsStatement:
|
|
|
|
rows, err = e.executeShowDiagnosticsStatement(stmt)
|
|
|
|
case *influxql.ShowGrantsForUserStatement:
|
|
|
|
rows, err = e.executeShowGrantsForUserStatement(stmt)
|
2016-07-28 22:38:08 +00:00
|
|
|
case *influxql.ShowMeasurementsStatement:
|
|
|
|
return e.executeShowMeasurementsStatement(stmt, &ctx)
|
2016-03-31 22:12:29 +00:00
|
|
|
case *influxql.ShowRetentionPoliciesStatement:
|
|
|
|
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
|
|
|
|
case *influxql.ShowShardsStatement:
|
|
|
|
rows, err = e.executeShowShardsStatement(stmt)
|
|
|
|
case *influxql.ShowShardGroupsStatement:
|
|
|
|
rows, err = e.executeShowShardGroupsStatement(stmt)
|
|
|
|
case *influxql.ShowStatsStatement:
|
|
|
|
rows, err = e.executeShowStatsStatement(stmt)
|
|
|
|
case *influxql.ShowSubscriptionsStatement:
|
|
|
|
rows, err = e.executeShowSubscriptionsStatement(stmt)
|
2016-07-28 22:38:08 +00:00
|
|
|
case *influxql.ShowTagValuesStatement:
|
|
|
|
return e.executeShowTagValues(stmt, &ctx)
|
2016-03-31 22:12:29 +00:00
|
|
|
case *influxql.ShowUsersStatement:
|
|
|
|
rows, err = e.executeShowUsersStatement(stmt)
|
|
|
|
case *influxql.SetPasswordUserStatement:
|
2016-04-29 13:00:21 +00:00
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
err = e.executeSetPasswordUserStatement(stmt)
|
2016-06-10 17:30:49 +00:00
|
|
|
case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement:
|
|
|
|
// Send query related statements to the task manager.
|
|
|
|
return e.TaskManager.ExecuteStatement(stmt, ctx)
|
2016-03-31 22:12:29 +00:00
|
|
|
default:
|
|
|
|
return influxql.ErrInvalidQuery
|
|
|
|
}
|
2016-02-12 22:10:02 +00:00
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
Series: rows,
|
2016-04-13 14:44:59 +00:00
|
|
|
Messages: messages,
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
2016-03-31 22:12:29 +00:00
|
|
|
return nil
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
rpu := &meta.RetentionPolicyUpdate{
|
2016-03-20 13:44:13 +00:00
|
|
|
Duration: stmt.Duration,
|
|
|
|
ReplicaN: stmt.Replication,
|
|
|
|
ShardGroupDuration: stmt.ShardGroupDuration,
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Update the retention policy.
|
|
|
|
if err := e.MetaClient.UpdateRetentionPolicy(stmt.Database, stmt.Name, rpu); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// If requested, set as default retention policy.
|
|
|
|
if stmt.Default {
|
|
|
|
if err := e.MetaClient.SetDefaultRetentionPolicy(stmt.Database, stmt.Name); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) error {
|
2016-05-20 16:37:10 +00:00
|
|
|
// Verify that retention policies exist.
|
|
|
|
var err error
|
|
|
|
verifyRPFn := func(n influxql.Node) {
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
switch m := n.(type) {
|
|
|
|
case *influxql.Measurement:
|
|
|
|
var rp *meta.RetentionPolicyInfo
|
|
|
|
if rp, err = e.MetaClient.RetentionPolicy(m.Database, m.RetentionPolicy); err != nil {
|
|
|
|
return
|
|
|
|
} else if rp == nil {
|
|
|
|
err = fmt.Errorf("%s: %s.%s", meta.ErrRetentionPolicyNotFound, m.Database, m.RetentionPolicy)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
influxql.WalkFunc(q, verifyRPFn)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.CreateContinuousQuery(q.Database, q.Name, q.String())
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
if !stmt.RetentionPolicyCreate {
|
|
|
|
_, err := e.MetaClient.CreateDatabase(stmt.Name)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-08-09 17:00:04 +00:00
|
|
|
spec := meta.RetentionPolicySpec{
|
|
|
|
Name: stmt.RetentionPolicyName,
|
|
|
|
Duration: stmt.RetentionPolicyDuration,
|
|
|
|
ReplicaN: stmt.RetentionPolicyReplication,
|
|
|
|
ShardGroupDuration: stmt.RetentionPolicyShardGroupDuration,
|
|
|
|
}
|
|
|
|
_, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, &spec)
|
2016-02-12 22:10:02 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error {
|
2016-08-09 17:00:04 +00:00
|
|
|
spec := meta.RetentionPolicySpec{
|
|
|
|
Name: stmt.Name,
|
|
|
|
Duration: &stmt.Duration,
|
|
|
|
ReplicaN: &stmt.Replication,
|
|
|
|
ShardGroupDuration: stmt.ShardGroupDuration,
|
|
|
|
}
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
// Create new retention policy.
|
2016-08-09 17:00:04 +00:00
|
|
|
rp, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec)
|
|
|
|
if err != nil {
|
2016-02-12 22:10:02 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// If requested, set new policy as the default.
|
|
|
|
if stmt.Default {
|
2016-08-09 17:00:04 +00:00
|
|
|
if err := e.MetaClient.SetDefaultRetentionPolicy(stmt.Database, rp.Name); err != nil {
|
2016-02-12 22:10:02 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeCreateSubscriptionStatement(q *influxql.CreateSubscriptionStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.CreateSubscription(q.Database, q.RetentionPolicy, q.Name, q.Mode, q.Destinations)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
_, err := e.MetaClient.CreateUser(q.Name, q.Password, q.Admin)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-04-26 21:43:10 +00:00
|
|
|
func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSeriesStatement, database string) error {
|
2016-04-30 22:04:38 +00:00
|
|
|
if dbi := e.MetaClient.Database(database); dbi == nil {
|
2016-04-26 21:43:10 +00:00
|
|
|
return influxql.ErrDatabaseNotFound(database)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert "now()" to current time.
|
|
|
|
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
|
|
|
|
|
|
|
|
// Locally delete the series.
|
2016-04-29 22:31:57 +00:00
|
|
|
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
|
2016-04-26 21:43:10 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.DropContinuousQuery(q.Database, q.Name)
|
|
|
|
}
|
|
|
|
|
2016-02-19 14:00:17 +00:00
|
|
|
// executeDropDatabaseStatement drops a database from the cluster.
|
|
|
|
// It does not return an error if the database was not found on any of
|
|
|
|
// the nodes, or in the Meta store.
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabaseStatement) error {
|
2016-06-07 18:14:04 +00:00
|
|
|
// Locally delete the datababse.
|
|
|
|
if err := e.TSDBStore.DeleteDatabase(stmt.Name); err != nil {
|
2016-02-12 22:10:02 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-06-07 18:14:04 +00:00
|
|
|
// Remove the database from the Meta Store.
|
|
|
|
return e.MetaClient.DropDatabase(stmt.Name)
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error {
|
2016-04-30 22:04:38 +00:00
|
|
|
if dbi := e.MetaClient.Database(database); dbi == nil {
|
2016-02-19 14:00:17 +00:00
|
|
|
return influxql.ErrDatabaseNotFound(database)
|
2016-02-15 13:00:58 +00:00
|
|
|
}
|
|
|
|
|
2016-02-19 14:00:17 +00:00
|
|
|
// Locally drop the measurement
|
2016-03-08 19:59:33 +00:00
|
|
|
return e.TSDBStore.DeleteMeasurement(database, stmt.Name)
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) error {
|
2016-04-30 22:04:38 +00:00
|
|
|
if dbi := e.MetaClient.Database(database); dbi == nil {
|
2016-02-19 14:00:17 +00:00
|
|
|
return influxql.ErrDatabaseNotFound(database)
|
|
|
|
}
|
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
// Check for time in WHERE clause (not supported).
|
|
|
|
if influxql.HasTimeExpr(stmt.Condition) {
|
|
|
|
return errors.New("DROP SERIES doesn't support time in WHERE clause")
|
|
|
|
}
|
|
|
|
|
2016-02-19 14:00:17 +00:00
|
|
|
// Locally drop the series.
|
2016-04-29 22:31:57 +00:00
|
|
|
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
|
2016-06-07 18:14:04 +00:00
|
|
|
// Locally delete the shard.
|
|
|
|
if err := e.TSDBStore.DeleteShard(stmt.ID); err != nil {
|
2016-03-11 15:53:15 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-06-07 18:14:04 +00:00
|
|
|
// Remove the shard reference from the Meta Store.
|
|
|
|
return e.MetaClient.DropShard(stmt.ID)
|
2016-03-11 15:53:15 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropRetentionPolicyStatement(stmt *influxql.DropRetentionPolicyStatement) error {
|
2016-06-07 18:14:04 +00:00
|
|
|
// Locally drop the retention policy.
|
|
|
|
if err := e.TSDBStore.DeleteRetentionPolicy(stmt.Database, stmt.Name); err != nil {
|
2016-02-19 14:00:17 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-06-07 18:14:04 +00:00
|
|
|
return e.MetaClient.DropRetentionPolicy(stmt.Database, stmt.Name)
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropSubscriptionStatement(q *influxql.DropSubscriptionStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.DropSubscription(q.Database, q.RetentionPolicy, q.Name)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.DropUser(q.Name)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.SetAdminPrivilege(stmt.User, true)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
priv := influxql.NoPrivileges
|
|
|
|
|
|
|
|
// Revoking all privileges means there's no need to look at existing user privileges.
|
|
|
|
if stmt.Privilege != influxql.AllPrivileges {
|
|
|
|
p, err := e.MetaClient.UserPrivilege(stmt.User, stmt.On)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// Bit clear (AND NOT) the user's privilege with the revoked privilege.
|
|
|
|
priv = *p &^ stmt.Privilege
|
|
|
|
}
|
|
|
|
|
|
|
|
return e.MetaClient.SetPrivilege(stmt.User, stmt.On, priv)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeRevokeAdminStatement(stmt *influxql.RevokeAdminStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.SetAdminPrivilege(stmt.User, false)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
return e.MetaClient.UpdateUser(q.Name, q.Password)
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
|
2016-07-18 17:53:21 +00:00
|
|
|
itrs, stmt, err := e.createIterators(stmt, ctx)
|
2016-03-28 17:26:13 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
// Generate a row emitter from the iterator set.
|
2016-03-31 22:12:29 +00:00
|
|
|
em := influxql.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
|
2016-02-12 22:10:02 +00:00
|
|
|
em.Columns = stmt.ColumnNames()
|
|
|
|
em.OmitTime = stmt.OmitTime
|
|
|
|
defer em.Close()
|
|
|
|
|
|
|
|
// Emit rows to the results channel.
|
|
|
|
var writeN int64
|
|
|
|
var emitted bool
|
2016-05-26 16:32:56 +00:00
|
|
|
|
|
|
|
var pointsWriter *BufferedPointsWriter
|
|
|
|
if stmt.Target != nil {
|
|
|
|
pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000)
|
|
|
|
}
|
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
for {
|
2016-04-17 20:00:59 +00:00
|
|
|
row, err := em.Emit()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
} else if row == nil {
|
2016-03-21 16:00:07 +00:00
|
|
|
// Check if the query was interrupted while emitting.
|
|
|
|
select {
|
2016-03-31 22:12:29 +00:00
|
|
|
case <-ctx.InterruptCh:
|
2016-03-21 16:00:07 +00:00
|
|
|
return influxql.ErrQueryInterrupted
|
|
|
|
default:
|
|
|
|
}
|
2016-02-12 22:10:02 +00:00
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write points back into system for INTO statements.
|
|
|
|
if stmt.Target != nil {
|
2016-05-26 16:32:56 +00:00
|
|
|
if err := e.writeInto(pointsWriter, stmt, row); err != nil {
|
2016-02-12 22:10:02 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
writeN += int64(len(row.Values))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2016-04-29 13:00:21 +00:00
|
|
|
result := &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
Series: []*models.Row{row},
|
|
|
|
}
|
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
// Send results or exit if closing.
|
|
|
|
select {
|
2016-03-31 22:12:29 +00:00
|
|
|
case <-ctx.InterruptCh:
|
2016-03-21 16:00:07 +00:00
|
|
|
return influxql.ErrQueryInterrupted
|
2016-03-31 22:12:29 +00:00
|
|
|
case ctx.Results <- result:
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
emitted = true
|
|
|
|
}
|
|
|
|
|
2016-10-13 05:43:38 +00:00
|
|
|
// Flush remaining points and emit write count if an INTO statement.
|
2016-02-12 22:10:02 +00:00
|
|
|
if stmt.Target != nil {
|
2016-05-26 16:32:56 +00:00
|
|
|
if err := pointsWriter.Flush(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-04-29 13:00:21 +00:00
|
|
|
var messages []*influxql.Message
|
|
|
|
if ctx.ReadOnly {
|
|
|
|
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
2016-04-29 13:00:21 +00:00
|
|
|
Messages: messages,
|
2016-02-12 22:10:02 +00:00
|
|
|
Series: []*models.Row{{
|
|
|
|
Name: "result",
|
|
|
|
Columns: []string{"time", "written"},
|
|
|
|
Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},
|
|
|
|
}},
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Always emit at least one result.
|
|
|
|
if !emitted {
|
2016-03-31 22:12:29 +00:00
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
2016-02-12 22:10:02 +00:00
|
|
|
Series: make([]*models.Row, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-07-18 17:53:21 +00:00
|
|
|
func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) ([]influxql.Iterator, *influxql.SelectStatement, error) {
|
|
|
|
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
|
|
|
|
now := time.Now().UTC()
|
|
|
|
opt := influxql.SelectOptions{
|
|
|
|
InterruptCh: ctx.InterruptCh,
|
|
|
|
NodeID: ctx.ExecutionOptions.NodeID,
|
2016-08-08 16:39:38 +00:00
|
|
|
MaxSeriesN: e.MaxSelectSeriesN,
|
2016-07-18 17:53:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Replace instances of "now()" with the current time, and check the resultant times.
|
|
|
|
nowValuer := influxql.NowValuer{Now: now}
|
|
|
|
stmt.Condition = influxql.Reduce(stmt.Condition, &nowValuer)
|
|
|
|
// Replace instances of "now()" with the current time in the dimensions.
|
|
|
|
for _, d := range stmt.Dimensions {
|
|
|
|
d.Expr = influxql.Reduce(d.Expr, &nowValuer)
|
|
|
|
}
|
|
|
|
|
|
|
|
var err error
|
|
|
|
opt.MinTime, opt.MaxTime, err = influxql.TimeRange(stmt.Condition)
|
|
|
|
if err != nil {
|
|
|
|
return nil, stmt, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if opt.MaxTime.IsZero() {
|
|
|
|
// In the case that we're executing a meta query where the user cannot
|
|
|
|
// specify a time condition, then we expand the default max time
|
|
|
|
// to the maximum possible value, to ensure that data where all points
|
|
|
|
// are in the future are returned.
|
|
|
|
if influxql.Sources(stmt.Sources).HasSystemSource() {
|
|
|
|
opt.MaxTime = time.Unix(0, influxql.MaxTime).UTC()
|
|
|
|
} else {
|
2016-10-25 20:05:44 +00:00
|
|
|
if interval, err := stmt.GroupByInterval(); err != nil {
|
|
|
|
return nil, stmt, err
|
|
|
|
} else if interval > 0 {
|
|
|
|
opt.MaxTime = now
|
|
|
|
} else {
|
|
|
|
opt.MaxTime = time.Unix(0, influxql.MaxTime).UTC()
|
|
|
|
}
|
2016-07-18 17:53:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if opt.MinTime.IsZero() {
|
2016-08-25 17:52:39 +00:00
|
|
|
opt.MinTime = time.Unix(0, influxql.MinTime).UTC()
|
2016-07-18 17:53:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Convert DISTINCT into a call.
|
|
|
|
stmt.RewriteDistinct()
|
|
|
|
|
|
|
|
// Remove "time" from fields list.
|
|
|
|
stmt.RewriteTimeFields()
|
|
|
|
|
2016-10-20 16:18:04 +00:00
|
|
|
// Rewrite any regex conditions that could make use of the index.
|
|
|
|
stmt.RewriteRegexConditions()
|
|
|
|
|
2016-07-18 17:53:21 +00:00
|
|
|
// Create an iterator creator based on the shards in the cluster.
|
|
|
|
ic, err := e.iteratorCreator(stmt, &opt)
|
|
|
|
if err != nil {
|
|
|
|
return nil, stmt, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Expand regex sources to their actual source names.
|
|
|
|
if stmt.Sources.HasRegex() {
|
|
|
|
sources, err := ic.ExpandSources(stmt.Sources)
|
|
|
|
if err != nil {
|
|
|
|
return nil, stmt, err
|
|
|
|
}
|
|
|
|
stmt.Sources = sources
|
|
|
|
}
|
|
|
|
|
|
|
|
// Rewrite wildcards, if any exist.
|
|
|
|
tmp, err := stmt.RewriteFields(ic)
|
|
|
|
if err != nil {
|
|
|
|
return nil, stmt, err
|
|
|
|
}
|
|
|
|
stmt = tmp
|
|
|
|
|
|
|
|
if e.MaxSelectBucketsN > 0 && !stmt.IsRawQuery {
|
|
|
|
interval, err := stmt.GroupByInterval()
|
|
|
|
if err != nil {
|
|
|
|
return nil, stmt, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if interval > 0 {
|
|
|
|
// Determine the start and end time matched to the interval (may not match the actual times).
|
|
|
|
min := opt.MinTime.Truncate(interval)
|
|
|
|
max := opt.MaxTime.Truncate(interval).Add(interval)
|
|
|
|
|
|
|
|
// Determine the number of buckets by finding the time span and dividing by the interval.
|
|
|
|
buckets := int64(max.Sub(min)) / int64(interval)
|
|
|
|
if int(buckets) > e.MaxSelectBucketsN {
|
2016-10-28 20:44:33 +00:00
|
|
|
return nil, stmt, fmt.Errorf("max-select-buckets limit exceeded: (%d/%d)", buckets, e.MaxSelectBucketsN)
|
2016-07-18 17:53:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a set of iterators from a selection.
|
|
|
|
itrs, err := influxql.Select(stmt, ic, &opt)
|
|
|
|
if err != nil {
|
|
|
|
return nil, stmt, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if e.MaxSelectPointN > 0 {
|
|
|
|
monitor := influxql.PointLimitMonitor(itrs, influxql.DefaultStatsInterval, e.MaxSelectPointN)
|
|
|
|
ctx.Query.Monitor(monitor)
|
|
|
|
}
|
|
|
|
return itrs, stmt, nil
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
// iteratorCreator returns a new instance of IteratorCreator based on stmt.
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
|
2016-02-19 20:38:02 +00:00
|
|
|
// Retrieve a list of shard IDs.
|
|
|
|
shards, err := e.MetaClient.ShardsByTimeRange(stmt.Sources, opt.MinTime, opt.MaxTime)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-06-10 15:14:21 +00:00
|
|
|
return e.TSDBStore.IteratorCreator(shards, opt)
|
2016-02-19 20:38:02 +00:00
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
|
2016-04-30 22:04:38 +00:00
|
|
|
dis := e.MetaClient.Databases()
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
rows := []*models.Row{}
|
|
|
|
for _, di := range dis {
|
|
|
|
row := &models.Row{Columns: []string{"name", "query"}, Name: di.Name}
|
|
|
|
for _, cqi := range di.ContinuousQueries {
|
|
|
|
row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query})
|
|
|
|
}
|
|
|
|
rows = append(rows, row)
|
|
|
|
}
|
|
|
|
return rows, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) (models.Rows, error) {
|
2016-04-30 22:04:38 +00:00
|
|
|
dis := e.MetaClient.Databases()
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
row := &models.Row{Name: "databases", Columns: []string{"name"}}
|
|
|
|
for _, di := range dis {
|
|
|
|
row.Values = append(row.Values, []interface{}{di.Name})
|
|
|
|
}
|
|
|
|
return []*models.Row{row}, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) (models.Rows, error) {
|
2016-02-12 22:10:02 +00:00
|
|
|
diags, err := e.Monitor.Diagnostics()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get a sorted list of diagnostics keys.
|
|
|
|
sortedKeys := make([]string, 0, len(diags))
|
|
|
|
for k := range diags {
|
|
|
|
sortedKeys = append(sortedKeys, k)
|
|
|
|
}
|
|
|
|
sort.Strings(sortedKeys)
|
|
|
|
|
|
|
|
rows := make([]*models.Row, 0, len(diags))
|
|
|
|
for _, k := range sortedKeys {
|
|
|
|
if stmt.Module != "" && k != stmt.Module {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &models.Row{Name: k}
|
|
|
|
|
|
|
|
row.Columns = diags[k].Columns
|
|
|
|
row.Values = diags[k].Rows
|
|
|
|
rows = append(rows, row)
|
|
|
|
}
|
|
|
|
return rows, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) (models.Rows, error) {
|
2016-02-12 22:10:02 +00:00
|
|
|
priv, err := e.MetaClient.UserPrivileges(q.Name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &models.Row{Columns: []string{"database", "privilege"}}
|
|
|
|
for d, p := range priv {
|
|
|
|
row.Values = append(row.Values, []interface{}{d, p.String()})
|
|
|
|
}
|
|
|
|
return []*models.Row{row}, nil
|
|
|
|
}
|
|
|
|
|
2016-07-28 22:38:08 +00:00
|
|
|
func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMeasurementsStatement, ctx *influxql.ExecutionContext) error {
|
2016-09-13 20:36:56 +00:00
|
|
|
if q.Database == "" {
|
2016-07-28 22:38:08 +00:00
|
|
|
return ErrDatabaseNameRequired
|
|
|
|
}
|
|
|
|
|
2016-09-13 20:36:56 +00:00
|
|
|
measurements, err := e.TSDBStore.Measurements(q.Database, q.Condition)
|
2016-07-28 22:38:08 +00:00
|
|
|
if err != nil || len(measurements) == 0 {
|
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
Err: err,
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if q.Offset > 0 {
|
|
|
|
if q.Offset >= len(measurements) {
|
|
|
|
measurements = nil
|
|
|
|
} else {
|
|
|
|
measurements = measurements[q.Offset:]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if q.Limit > 0 {
|
|
|
|
if q.Limit < len(measurements) {
|
|
|
|
measurements = measurements[:q.Limit]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
values := make([][]interface{}, len(measurements))
|
|
|
|
for i, m := range measurements {
|
|
|
|
values[i] = []interface{}{m}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(values) == 0 {
|
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
Series: []*models.Row{{
|
|
|
|
Name: "measurements",
|
|
|
|
Columns: []string{"name"},
|
|
|
|
Values: values,
|
|
|
|
}},
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
|
2016-09-13 20:36:56 +00:00
|
|
|
if q.Database == "" {
|
|
|
|
return nil, ErrDatabaseNameRequired
|
|
|
|
}
|
|
|
|
|
2016-04-30 22:04:38 +00:00
|
|
|
di := e.MetaClient.Database(q.Database)
|
|
|
|
if di == nil {
|
2016-02-12 22:10:02 +00:00
|
|
|
return nil, influxdb.ErrDatabaseNotFound(q.Database)
|
|
|
|
}
|
|
|
|
|
2016-03-20 13:44:13 +00:00
|
|
|
row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}}
|
2016-02-12 22:10:02 +00:00
|
|
|
for _, rpi := range di.RetentionPolicies {
|
2016-03-20 13:44:13 +00:00
|
|
|
row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
return []*models.Row{row}, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) {
|
2016-04-30 22:04:38 +00:00
|
|
|
dis := e.MetaClient.Databases()
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
rows := []*models.Row{}
|
|
|
|
for _, di := range dis {
|
|
|
|
row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name}
|
|
|
|
for _, rpi := range di.RetentionPolicies {
|
|
|
|
for _, sgi := range rpi.ShardGroups {
|
|
|
|
// Shards associated with deleted shard groups are effectively deleted.
|
|
|
|
// Don't list them.
|
|
|
|
if sgi.Deleted() {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, si := range sgi.Shards {
|
|
|
|
ownerIDs := make([]uint64, len(si.Owners))
|
|
|
|
for i, owner := range si.Owners {
|
|
|
|
ownerIDs[i] = owner.NodeID
|
|
|
|
}
|
|
|
|
|
|
|
|
row.Values = append(row.Values, []interface{}{
|
|
|
|
si.ID,
|
|
|
|
di.Name,
|
|
|
|
rpi.Name,
|
|
|
|
sgi.ID,
|
|
|
|
sgi.StartTime.UTC().Format(time.RFC3339),
|
|
|
|
sgi.EndTime.UTC().Format(time.RFC3339),
|
|
|
|
sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
|
|
|
|
joinUint64(ownerIDs),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
rows = append(rows, row)
|
|
|
|
}
|
|
|
|
return rows, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) {
|
2016-04-30 22:04:38 +00:00
|
|
|
dis := e.MetaClient.Databases()
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
row := &models.Row{Columns: []string{"id", "database", "retention_policy", "start_time", "end_time", "expiry_time"}, Name: "shard groups"}
|
|
|
|
for _, di := range dis {
|
|
|
|
for _, rpi := range di.RetentionPolicies {
|
|
|
|
for _, sgi := range rpi.ShardGroups {
|
|
|
|
// Shards associated with deleted shard groups are effectively deleted.
|
|
|
|
// Don't list them.
|
|
|
|
if sgi.Deleted() {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
row.Values = append(row.Values, []interface{}{
|
|
|
|
sgi.ID,
|
|
|
|
di.Name,
|
|
|
|
rpi.Name,
|
|
|
|
sgi.StartTime.UTC().Format(time.RFC3339),
|
|
|
|
sgi.EndTime.UTC().Format(time.RFC3339),
|
|
|
|
sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return []*models.Row{row}, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) {
|
2016-02-12 22:10:02 +00:00
|
|
|
stats, err := e.Monitor.Statistics(nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var rows []*models.Row
|
|
|
|
for _, stat := range stats {
|
|
|
|
if stmt.Module != "" && stat.Name != stmt.Module {
|
|
|
|
continue
|
|
|
|
}
|
2016-08-19 10:12:35 +00:00
|
|
|
row := &models.Row{Name: stat.Name, Tags: stat.Tags}
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
values := make([]interface{}, 0, len(stat.Values))
|
|
|
|
for _, k := range stat.ValueNames() {
|
|
|
|
row.Columns = append(row.Columns, k)
|
|
|
|
values = append(values, stat.Values[k])
|
|
|
|
}
|
|
|
|
row.Values = [][]interface{}{values}
|
|
|
|
rows = append(rows, row)
|
|
|
|
}
|
|
|
|
return rows, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) {
|
2016-04-30 22:04:38 +00:00
|
|
|
dis := e.MetaClient.Databases()
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
rows := []*models.Row{}
|
|
|
|
for _, di := range dis {
|
|
|
|
row := &models.Row{Columns: []string{"retention_policy", "name", "mode", "destinations"}, Name: di.Name}
|
|
|
|
for _, rpi := range di.RetentionPolicies {
|
|
|
|
for _, si := range rpi.Subscriptions {
|
|
|
|
row.Values = append(row.Values, []interface{}{rpi.Name, si.Name, si.Mode, si.Destinations})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(row.Values) > 0 {
|
|
|
|
rows = append(rows, row)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return rows, nil
|
|
|
|
}
|
|
|
|
|
2016-07-28 22:38:08 +00:00
|
|
|
func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatement, ctx *influxql.ExecutionContext) error {
|
|
|
|
if ctx.Database == "" {
|
|
|
|
return ErrDatabaseNameRequired
|
|
|
|
}
|
|
|
|
|
|
|
|
tagValues, err := e.TSDBStore.TagValues(ctx.Database, q.Condition)
|
|
|
|
if err != nil {
|
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
Err: err,
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
emitted := false
|
|
|
|
for _, m := range tagValues {
|
|
|
|
values := m.Values
|
|
|
|
|
|
|
|
if q.Offset > 0 {
|
|
|
|
if q.Offset >= len(values) {
|
|
|
|
values = nil
|
|
|
|
} else {
|
|
|
|
values = values[q.Offset:]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if q.Limit > 0 {
|
|
|
|
if q.Limit < len(values) {
|
|
|
|
values = values[:q.Limit]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(values) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &models.Row{
|
|
|
|
Name: m.Measurement,
|
|
|
|
Columns: []string{"key", "value"},
|
|
|
|
Values: make([][]interface{}, len(values)),
|
|
|
|
}
|
|
|
|
for i, v := range values {
|
|
|
|
row.Values[i] = []interface{}{v.Key, v.Value}
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
Series: []*models.Row{row},
|
|
|
|
}
|
|
|
|
emitted = true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure at least one result is emitted.
|
|
|
|
if !emitted {
|
|
|
|
ctx.Results <- &influxql.Result{
|
|
|
|
StatementID: ctx.StatementID,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) (models.Rows, error) {
|
2016-02-12 22:10:02 +00:00
|
|
|
row := &models.Row{Columns: []string{"user", "admin"}}
|
|
|
|
for _, ui := range e.MetaClient.Users() {
|
|
|
|
row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin})
|
|
|
|
}
|
|
|
|
return []*models.Row{row}, nil
|
|
|
|
}
|
|
|
|
|
2016-05-26 16:32:56 +00:00
|
|
|
type BufferedPointsWriter struct {
|
|
|
|
w pointsWriter
|
|
|
|
buf []models.Point
|
|
|
|
database string
|
|
|
|
retentionPolicy string
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter {
|
|
|
|
return &BufferedPointsWriter{
|
|
|
|
w: w,
|
|
|
|
buf: make([]models.Point, 0, capacity),
|
|
|
|
database: database,
|
|
|
|
retentionPolicy: retentionPolicy,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error {
|
|
|
|
// Make sure we're buffering points only for the expected destination.
|
|
|
|
if req.Database != w.database || req.RetentionPolicy != w.retentionPolicy {
|
|
|
|
return fmt.Errorf("writer for %s.%s can't write into %s.%s", w.database, w.retentionPolicy, req.Database, req.RetentionPolicy)
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := 0; i < len(req.Points); {
|
|
|
|
// Get the available space in the buffer.
|
|
|
|
avail := cap(w.buf) - len(w.buf)
|
|
|
|
|
|
|
|
// Calculate number of points to copy into the buffer.
|
|
|
|
n := len(req.Points[i:])
|
|
|
|
if n > avail {
|
|
|
|
n = avail
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copy points into buffer.
|
|
|
|
w.buf = append(w.buf, req.Points[i:n+i]...)
|
|
|
|
|
|
|
|
// Advance the index by number of points copied.
|
|
|
|
i += n
|
|
|
|
|
|
|
|
// If buffer is full, flush points to underlying writer.
|
|
|
|
if len(w.buf) == cap(w.buf) {
|
|
|
|
if err := w.Flush(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Flush writes all buffered points to the underlying writer.
|
|
|
|
func (w *BufferedPointsWriter) Flush() error {
|
|
|
|
if len(w.buf) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := w.w.WritePointsInto(&IntoWriteRequest{
|
|
|
|
Database: w.database,
|
|
|
|
RetentionPolicy: w.retentionPolicy,
|
|
|
|
Points: w.buf,
|
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Clear the buffer.
|
|
|
|
w.buf = w.buf[:0]
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Len returns the number of points buffered.
|
|
|
|
func (w *BufferedPointsWriter) Len() int { return len(w.buf) }
|
|
|
|
|
|
|
|
// Cap returns the capacity (in points) of the buffer.
|
|
|
|
func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) }
|
|
|
|
|
|
|
|
func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
if stmt.Target.Measurement.Database == "" {
|
|
|
|
return errNoDatabaseInTarget
|
|
|
|
}
|
|
|
|
|
|
|
|
// It might seem a bit weird that this is where we do this, since we will have to
|
|
|
|
// convert rows back to points. The Executors (both aggregate and raw) are complex
|
|
|
|
// enough that changing them to write back to the DB is going to be clumsy
|
|
|
|
//
|
|
|
|
// it might seem weird to have the write be in the QueryExecutor, but the interweaving of
|
|
|
|
// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the
|
|
|
|
// results will be the same as when queried normally.
|
|
|
|
name := stmt.Target.Measurement.Name
|
|
|
|
if name == "" {
|
|
|
|
name = row.Name
|
|
|
|
}
|
|
|
|
|
|
|
|
points, err := convertRowToPoints(name, row)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-05-26 16:32:56 +00:00
|
|
|
if err := w.WritePointsInto(&IntoWriteRequest{
|
2016-02-12 22:10:02 +00:00
|
|
|
Database: stmt.Target.Measurement.Database,
|
|
|
|
RetentionPolicy: stmt.Target.Measurement.RetentionPolicy,
|
|
|
|
Points: points,
|
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var errNoDatabaseInTarget = errors.New("no database in target")
|
|
|
|
|
|
|
|
// convertRowToPoints will convert a query result Row into Points that can be written back in.
|
|
|
|
func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
|
|
|
|
// figure out which parts of the result are the time and which are the fields
|
|
|
|
timeIndex := -1
|
|
|
|
fieldIndexes := make(map[string]int)
|
|
|
|
for i, c := range row.Columns {
|
|
|
|
if c == "time" {
|
|
|
|
timeIndex = i
|
|
|
|
} else {
|
|
|
|
fieldIndexes[c] = i
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if timeIndex == -1 {
|
|
|
|
return nil, errors.New("error finding time index in result")
|
|
|
|
}
|
|
|
|
|
|
|
|
points := make([]models.Point, 0, len(row.Values))
|
|
|
|
for _, v := range row.Values {
|
|
|
|
vals := make(map[string]interface{})
|
|
|
|
for fieldName, fieldIndex := range fieldIndexes {
|
|
|
|
val := v[fieldIndex]
|
|
|
|
if val != nil {
|
|
|
|
vals[fieldName] = v[fieldIndex]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-30 16:49:53 +00:00
|
|
|
p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
|
2016-02-12 22:10:02 +00:00
|
|
|
if err != nil {
|
|
|
|
// Drop points that can't be stored
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
points = append(points, p)
|
|
|
|
}
|
|
|
|
|
|
|
|
return points, nil
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
// NormalizeStatement adds a default database and policy to the measurements in statement.
|
|
|
|
func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error) {
|
2016-02-12 22:10:02 +00:00
|
|
|
influxql.WalkFunc(stmt, func(node influxql.Node) {
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
switch node := node.(type) {
|
2016-09-13 20:36:56 +00:00
|
|
|
case *influxql.ShowRetentionPoliciesStatement:
|
|
|
|
if node.Database == "" {
|
|
|
|
node.Database = defaultDatabase
|
|
|
|
}
|
|
|
|
case *influxql.ShowMeasurementsStatement:
|
|
|
|
if node.Database == "" {
|
|
|
|
node.Database = defaultDatabase
|
|
|
|
}
|
2016-02-12 22:10:02 +00:00
|
|
|
case *influxql.Measurement:
|
2016-10-27 22:17:08 +00:00
|
|
|
switch stmt.(type) {
|
|
|
|
case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement:
|
|
|
|
// DB and RP not supported by these statements so don't rewrite into invalid
|
|
|
|
// statements
|
|
|
|
default:
|
|
|
|
err = e.normalizeMeasurement(node, defaultDatabase)
|
|
|
|
}
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-03-31 22:12:29 +00:00
|
|
|
func (e *StatementExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error {
|
2016-02-12 22:10:02 +00:00
|
|
|
// Targets (measurements in an INTO clause) can have blank names, which means it will be
|
|
|
|
// the same as the measurement name it came from in the FROM clause.
|
|
|
|
if !m.IsTarget && m.Name == "" && m.Regex == nil {
|
|
|
|
return errors.New("invalid measurement")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Measurement does not have an explicit database? Insert default.
|
|
|
|
if m.Database == "" {
|
|
|
|
m.Database = defaultDatabase
|
|
|
|
}
|
|
|
|
|
|
|
|
// The database must now be specified by this point.
|
|
|
|
if m.Database == "" {
|
2016-07-28 22:38:08 +00:00
|
|
|
return ErrDatabaseNameRequired
|
2016-02-12 22:10:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Find database.
|
2016-04-30 22:04:38 +00:00
|
|
|
di := e.MetaClient.Database(m.Database)
|
|
|
|
if di == nil {
|
2016-02-12 22:10:02 +00:00
|
|
|
return influxdb.ErrDatabaseNotFound(m.Database)
|
|
|
|
}
|
|
|
|
|
|
|
|
// If no retention policy was specified, use the default.
|
|
|
|
if m.RetentionPolicy == "" {
|
|
|
|
if di.DefaultRetentionPolicy == "" {
|
|
|
|
return fmt.Errorf("default retention policy not set for: %s", di.Name)
|
|
|
|
}
|
|
|
|
m.RetentionPolicy = di.DefaultRetentionPolicy
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// IntoWriteRequest is a partial copy of cluster.WriteRequest
|
|
|
|
type IntoWriteRequest struct {
|
|
|
|
Database string
|
|
|
|
RetentionPolicy string
|
|
|
|
Points []models.Point
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
// TSDBStore is an interface for accessing the time series data store.
|
|
|
|
type TSDBStore interface {
|
2016-06-01 22:17:18 +00:00
|
|
|
CreateShard(database, policy string, shardID uint64, enabled bool) error
|
2016-02-19 20:38:02 +00:00
|
|
|
WriteToShard(shardID uint64, points []models.Point) error
|
|
|
|
|
2016-04-29 00:29:09 +00:00
|
|
|
RestoreShard(id uint64, r io.Reader) error
|
|
|
|
BackupShard(id uint64, since time.Time, w io.Writer) error
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
DeleteDatabase(name string) error
|
|
|
|
DeleteMeasurement(database, name string) error
|
|
|
|
DeleteRetentionPolicy(database, name string) error
|
2016-04-29 22:31:57 +00:00
|
|
|
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
|
2016-03-11 15:53:15 +00:00
|
|
|
DeleteShard(id uint64) error
|
2016-06-10 15:14:21 +00:00
|
|
|
IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error)
|
2016-07-28 22:38:08 +00:00
|
|
|
|
|
|
|
Measurements(database string, cond influxql.Expr) ([]string, error)
|
|
|
|
TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error)
|
2016-04-05 19:33:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type LocalTSDBStore struct {
|
|
|
|
*tsdb.Store
|
|
|
|
}
|
|
|
|
|
2016-06-10 15:14:21 +00:00
|
|
|
func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
|
2016-04-05 19:33:44 +00:00
|
|
|
shardIDs := make([]uint64, len(shards))
|
|
|
|
for i, sh := range shards {
|
|
|
|
shardIDs[i] = sh.ID
|
|
|
|
}
|
2016-06-10 15:14:21 +00:00
|
|
|
return s.Store.IteratorCreator(shardIDs, opt)
|
2016-03-31 22:12:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.
|
|
|
|
type ShardIteratorCreator interface {
|
2016-02-19 20:38:02 +00:00
|
|
|
ShardIteratorCreator(id uint64) influxql.IteratorCreator
|
|
|
|
}
|
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
// joinUint64 returns a comma-delimited string of uint64 numbers.
|
|
|
|
func joinUint64(a []uint64) string {
|
|
|
|
var buf bytes.Buffer
|
|
|
|
for i, x := range a {
|
|
|
|
buf.WriteString(strconv.FormatUint(x, 10))
|
|
|
|
if i < len(a)-1 {
|
|
|
|
buf.WriteRune(',')
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return buf.String()
|
|
|
|
}
|
|
|
|
|
|
|
|
// stringSet represents a set of strings.
|
|
|
|
type stringSet map[string]struct{}
|
|
|
|
|
|
|
|
// newStringSet returns an empty stringSet.
|
|
|
|
func newStringSet() stringSet {
|
|
|
|
return make(map[string]struct{})
|
|
|
|
}
|
|
|
|
|
|
|
|
// add adds strings to the set.
|
|
|
|
func (s stringSet) add(ss ...string) {
|
|
|
|
for _, n := range ss {
|
|
|
|
s[n] = struct{}{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// contains returns whether the set contains the given string.
|
|
|
|
func (s stringSet) contains(ss string) bool {
|
|
|
|
_, ok := s[ss]
|
|
|
|
return ok
|
|
|
|
}
|
|
|
|
|
|
|
|
// list returns the current elements in the set, in sorted order.
|
|
|
|
func (s stringSet) list() []string {
|
|
|
|
l := make([]string, 0, len(s))
|
|
|
|
for k := range s {
|
|
|
|
l = append(l, k)
|
|
|
|
}
|
|
|
|
sort.Strings(l)
|
|
|
|
return l
|
|
|
|
}
|
|
|
|
|
|
|
|
// union returns the union of this set and another.
|
|
|
|
func (s stringSet) union(o stringSet) stringSet {
|
|
|
|
ns := newStringSet()
|
|
|
|
for k := range s {
|
|
|
|
ns[k] = struct{}{}
|
|
|
|
}
|
|
|
|
for k := range o {
|
|
|
|
ns[k] = struct{}{}
|
|
|
|
}
|
|
|
|
return ns
|
|
|
|
}
|
|
|
|
|
|
|
|
// intersect returns the intersection of this set and another.
|
|
|
|
func (s stringSet) intersect(o stringSet) stringSet {
|
|
|
|
shorter, longer := s, o
|
|
|
|
if len(longer) < len(shorter) {
|
|
|
|
shorter, longer = longer, shorter
|
|
|
|
}
|
|
|
|
|
|
|
|
ns := newStringSet()
|
|
|
|
for k := range shorter {
|
|
|
|
if _, ok := longer[k]; ok {
|
|
|
|
ns[k] = struct{}{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ns
|
|
|
|
}
|