Merge pull request #6792 from benbjohnson/show-tag-values

Optimize SHOW TAG VALUES
pull/6803/head v1.0.0-beta1
Ben Johnson 2016-06-06 16:00:12 -06:00
commit bf3c22689b
4 changed files with 183 additions and 137 deletions

View File

@ -7,6 +7,7 @@ import (
"io"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb"
@ -403,6 +404,12 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw
}
func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
// Handle SHOW TAG VALUES separately so it can be optimized.
// https://github.com/influxdata/influxdb/issues/6233
if source, ok := stmt.Sources[0].(*influxql.Measurement); ok && source.Name == "_tags" {
return e.executeShowTagValues(stmt, ctx)
}
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh}
@ -597,6 +604,130 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt
return e.TSDBStore.IteratorCreator(shards)
}
func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
if stmt.Condition == nil {
return errors.New("a condition is required")
}
source := stmt.Sources[0].(*influxql.Measurement)
index := e.TSDBStore.DatabaseIndex(source.Database)
if index == nil {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
return nil
}
measurementExpr := influxql.CloneExpr(stmt.Condition)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
}
return e
}), nil)
mms, ok, err := index.MeasurementsByExpr(measurementExpr)
if err != nil {
return err
} else if !ok {
mms = index.Measurements()
sort.Sort(mms)
}
// If there are no measurements, return immediately.
if len(mms) == 0 {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
return nil
}
filterExpr := influxql.CloneExpr(stmt.Condition)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
}
return e
}), nil)
var emitted bool
columns := stmt.ColumnNames()
for _, mm := range mms {
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
if err != nil {
return err
}
ss := mm.SeriesByIDSlice(ids)
// Determine a list of keys from condition.
keySet, ok, err := mm.TagKeysByExpr(stmt.Condition)
if err != nil {
return err
}
// Loop over all keys for each series.
m := make(map[keyValue]struct{}, len(ss))
for _, series := range ss {
for key, value := range series.Tags {
if !ok {
// nop
} else if _, exists := keySet[key]; !exists {
continue
}
m[keyValue{key, value}] = struct{}{}
}
}
// Move to next series if no key/values match.
if len(m) == 0 {
continue
}
// Sort key/value set.
a := make([]keyValue, 0, len(m))
for kv := range m {
a = append(a, kv)
}
sort.Sort(keyValues(a))
// Convert to result values.
slab := make([]interface{}, len(a)*2)
values := make([][]interface{}, len(a))
for i, elem := range a {
slab[i*2], slab[i*2+1] = elem.key, elem.value
values[i] = slab[i*2 : i*2+2]
}
// Send result to client.
ctx.Results <- &influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{&models.Row{
Name: mm.Name,
Columns: columns,
Values: values,
}},
}
emitted = true
}
// Always emit at least one row.
if !emitted {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
}
return nil
}
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
dis := e.MetaClient.Databases()
@ -1008,6 +1139,7 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
DatabaseIndex(name string) *tsdb.DatabaseIndex
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}
@ -1105,3 +1237,19 @@ type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
type keyValue struct {
key, value string
}
type keyValues []keyValue
func (a keyValues) Len() int { return len(a) }
func (a keyValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a keyValues) Less(i, j int) bool {
ki, kj := a[i].key, a[j].key
if ki == kj {
return a[i].value < a[j].value
}
return ki < kj
}

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)
const (
@ -200,6 +201,7 @@ type TSDBStore struct {
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DatabaseIndexFn func(name string) *tsdb.DatabaseIndex
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}
@ -267,6 +269,10 @@ func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator {
return s.ShardIteratorCreatorFn(id)
}
func (s *TSDBStore) DatabaseIndex(name string) *tsdb.DatabaseIndex {
return s.DatabaseIndexFn(name)
}
// MustParseQuery parses s into a query. Panic on error.
func MustParseQuery(s string) *influxql.Query {
q, err := influxql.ParseQuery(s)

View File

@ -239,12 +239,18 @@ func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
return ss.Tags
}
// measurementsByExpr takes an expression containing only tags and returns a
// MeasurementsByExpr takes an expression containing only tags and returns a
// list of matching *Measurement. The bool return argument returns if the
// expression was a measurement expression. It is used to differentiate a list
// of no measurements because all measurements were filtered out (when the bool
// is true) against when there are no measurements because the expression
// wasn't evaluated (when the bool is false).
func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.measurementsByExpr(expr)
}
func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
if expr == nil {
return nil, false, nil
@ -538,6 +544,17 @@ func (m *Measurement) SeriesByID(id uint64) *Series {
return m.seriesByID[id]
}
// SeriesByIDSlice returns a list of series by identifiers.
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series {
m.mu.RLock()
defer m.mu.RUnlock()
a := make([]*Series, len(ids))
for i, id := range ids {
a[i] = m.seriesByID[id]
}
return a
}
// AppendSeriesKeysByID appends keys for a list of series ids to a buffer.
func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string {
m.mu.RLock()
@ -1121,8 +1138,14 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr,
return exprs
}
// seriesIDsAllOrByExpr walks an expressions for matching series IDs
// SeriesIDsAllOrByExpr walks an expressions for matching series IDs
// or, if no expressions is given, returns all series IDs for the measurement.
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesIDsAllOrByExpr(expr)
}
func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
// If no expression given or the measurement has no series,
// we can take just return the ids or nil accordingly.
@ -1142,7 +1165,7 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error
}
// tagKeysByExpr extracts the tag keys wanted by the expression.
func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
@ -1175,12 +1198,12 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
}
return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil
case influxql.AND, influxql.OR:
lhsKeys, lhsOk, err := m.tagKeysByExpr(e.LHS)
lhsKeys, lhsOk, err := m.TagKeysByExpr(e.LHS)
if err != nil {
return nil, false, err
}
rhsKeys, rhsOk, err := m.tagKeysByExpr(e.RHS)
rhsKeys, rhsOk, err := m.TagKeysByExpr(e.RHS)
if err != nil {
return nil, false, err
}
@ -1201,7 +1224,7 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
return nil, false, fmt.Errorf("invalid operator")
}
case *influxql.ParenExpr:
return m.tagKeysByExpr(e.Expr)
return m.TagKeysByExpr(e.Expr)
}
return nil, false, fmt.Errorf("%#v", expr)
}

View File

@ -12,7 +12,6 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
@ -515,8 +514,6 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
return NewSeriesIterator(s, opt)
case "_tagKeys":
return NewTagKeysIterator(s, opt)
case "_tags":
return NewTagValuesIterator(s, opt)
default:
return nil, fmt.Errorf("unknown system source: %s", m.Name)
}
@ -1325,134 +1322,6 @@ type tagValuesIterator struct {
}
}
// NewTagValuesIterator returns a new instance of TagValuesIterator.
func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if opt.Condition == nil {
return nil, errors.New("a condition is required")
}
measurementExpr := influxql.CloneExpr(opt.Condition)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
}
return e
}), nil)
mms, ok, err := sh.index.measurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
mms = sh.index.Measurements()
sort.Sort(mms)
}
// If there are no measurements, return immediately.
if len(mms) == 0 {
return &tagValuesIterator{}, nil
}
filterExpr := influxql.CloneExpr(opt.Condition)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
}
return e
}), nil)
var series []*Series
keys := newStringSet()
for _, mm := range mms {
ss, ok, err := mm.tagKeysByExpr(opt.Condition)
if err != nil {
return nil, err
} else if !ok {
keys.add(mm.TagKeys()...)
} else {
keys = keys.union(ss)
}
ids, err := mm.seriesIDsAllOrByExpr(filterExpr)
if err != nil {
return nil, err
}
for _, id := range ids {
series = append(series, mm.SeriesByID(id))
}
}
return &tagValuesIterator{
series: series,
keys: keys.list(),
fields: influxql.VarRefs(opt.Aux).Strings(),
}, nil
}
// Stats returns stats about the points processed.
func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *tagValuesIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) {
for {
// If there are no more values then move to the next key.
if len(itr.buf.keys) == 0 {
if len(itr.series) == 0 {
return nil, nil
}
itr.buf.s = itr.series[0]
itr.buf.keys = itr.keys
itr.series = itr.series[1:]
continue
}
key := itr.buf.keys[0]
value, ok := itr.buf.s.Tags[key]
if !ok {
itr.buf.keys = itr.buf.keys[1:]
continue
}
// Prepare auxiliary fields.
auxFields := make([]interface{}, len(itr.fields))
for i, f := range itr.fields {
switch f {
case "_tagKey":
auxFields[i] = key
case "value":
auxFields[i] = value
}
}
// Return next key.
p := &influxql.FloatPoint{
Name: itr.buf.s.measurement.Name,
Aux: auxFields,
}
itr.buf.keys = itr.buf.keys[1:]
return p, nil
}
}
// measurementKeyFunc is the function called by measurementKeysIterator.
type measurementKeyFunc func(m *Measurement) []string