optimize SHOW TAG VALUES

This commit optimizes `SHOW TAG VALUES` so that it avoids the
`SELECT` query engine execution and iterator creation. There
are also optimizations to reduce individual memory allocations
and to reduce in-memory heap size by only operating on one
measurement at a time.

Execution time has been reduce to approximately 900ms for
500,000 rows. This is about 2µs per row. Of this time,
approximately 1µs is spent retrieving and sorting the row
and 1µs is spent encoding into JSON and writing to the
response body.
pull/6792/head
Ben Johnson 2016-06-06 13:53:54 -06:00
parent 7da0638a84
commit 1b94cd2686
No known key found for this signature in database
GPG Key ID: 780E98C6BEDA0915
4 changed files with 183 additions and 137 deletions

View File

@ -7,6 +7,7 @@ import (
"io"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb"
@ -403,6 +404,12 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw
}
func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
// Handle SHOW TAG VALUES separately so it can be optimized.
// https://github.com/influxdata/influxdb/issues/6233
if source, ok := stmt.Sources[0].(*influxql.Measurement); ok && source.Name == "_tags" {
return e.executeShowTagValues(stmt, ctx)
}
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh}
@ -597,6 +604,130 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt
return e.TSDBStore.IteratorCreator(shards)
}
func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
if stmt.Condition == nil {
return errors.New("a condition is required")
}
source := stmt.Sources[0].(*influxql.Measurement)
index := e.TSDBStore.DatabaseIndex(source.Database)
if index == nil {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
return nil
}
measurementExpr := influxql.CloneExpr(stmt.Condition)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
}
return e
}), nil)
mms, ok, err := index.MeasurementsByExpr(measurementExpr)
if err != nil {
return err
} else if !ok {
mms = index.Measurements()
sort.Sort(mms)
}
// If there are no measurements, return immediately.
if len(mms) == 0 {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
return nil
}
filterExpr := influxql.CloneExpr(stmt.Condition)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
}
return e
}), nil)
var emitted bool
columns := stmt.ColumnNames()
for _, mm := range mms {
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
if err != nil {
return err
}
ss := mm.SeriesByIDSlice(ids)
// Determine a list of keys from condition.
keySet, ok, err := mm.TagKeysByExpr(stmt.Condition)
if err != nil {
return err
}
// Loop over all keys for each series.
m := make(map[keyValue]struct{}, len(ss))
for _, series := range ss {
for key, value := range series.Tags {
if !ok {
// nop
} else if _, exists := keySet[key]; !exists {
continue
}
m[keyValue{key, value}] = struct{}{}
}
}
// Move to next series if no key/values match.
if len(m) == 0 {
continue
}
// Sort key/value set.
a := make([]keyValue, 0, len(m))
for kv := range m {
a = append(a, kv)
}
sort.Sort(keyValues(a))
// Convert to result values.
slab := make([]interface{}, len(a)*2)
values := make([][]interface{}, len(a))
for i, elem := range a {
slab[i*2], slab[i*2+1] = elem.key, elem.value
values[i] = slab[i*2 : i*2+2]
}
// Send result to client.
ctx.Results <- &influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{&models.Row{
Name: mm.Name,
Columns: columns,
Values: values,
}},
}
emitted = true
}
// Always emit at least one row.
if !emitted {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
}
return nil
}
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
dis := e.MetaClient.Databases()
@ -1008,6 +1139,7 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
DatabaseIndex(name string) *tsdb.DatabaseIndex
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}
@ -1105,3 +1237,19 @@ type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
type keyValue struct {
key, value string
}
type keyValues []keyValue
func (a keyValues) Len() int { return len(a) }
func (a keyValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a keyValues) Less(i, j int) bool {
ki, kj := a[i].key, a[j].key
if ki == kj {
return a[i].value < a[j].value
}
return ki < kj
}

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)
const (
@ -200,6 +201,7 @@ type TSDBStore struct {
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DatabaseIndexFn func(name string) *tsdb.DatabaseIndex
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}
@ -267,6 +269,10 @@ func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator {
return s.ShardIteratorCreatorFn(id)
}
func (s *TSDBStore) DatabaseIndex(name string) *tsdb.DatabaseIndex {
return s.DatabaseIndexFn(name)
}
// MustParseQuery parses s into a query. Panic on error.
func MustParseQuery(s string) *influxql.Query {
q, err := influxql.ParseQuery(s)

View File

@ -239,12 +239,18 @@ func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
return ss.Tags
}
// measurementsByExpr takes an expression containing only tags and returns a
// MeasurementsByExpr takes an expression containing only tags and returns a
// list of matching *Measurement. The bool return argument returns if the
// expression was a measurement expression. It is used to differentiate a list
// of no measurements because all measurements were filtered out (when the bool
// is true) against when there are no measurements because the expression
// wasn't evaluated (when the bool is false).
func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.measurementsByExpr(expr)
}
func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
if expr == nil {
return nil, false, nil
@ -538,6 +544,17 @@ func (m *Measurement) SeriesByID(id uint64) *Series {
return m.seriesByID[id]
}
// SeriesByIDSlice returns a list of series by identifiers.
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series {
m.mu.RLock()
defer m.mu.RUnlock()
a := make([]*Series, len(ids))
for i, id := range ids {
a[i] = m.seriesByID[id]
}
return a
}
// AppendSeriesKeysByID appends keys for a list of series ids to a buffer.
func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string {
m.mu.RLock()
@ -1121,8 +1138,14 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr,
return exprs
}
// seriesIDsAllOrByExpr walks an expressions for matching series IDs
// SeriesIDsAllOrByExpr walks an expressions for matching series IDs
// or, if no expressions is given, returns all series IDs for the measurement.
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesIDsAllOrByExpr(expr)
}
func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
// If no expression given or the measurement has no series,
// we can take just return the ids or nil accordingly.
@ -1142,7 +1165,7 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error
}
// tagKeysByExpr extracts the tag keys wanted by the expression.
func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
@ -1175,12 +1198,12 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
}
return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil
case influxql.AND, influxql.OR:
lhsKeys, lhsOk, err := m.tagKeysByExpr(e.LHS)
lhsKeys, lhsOk, err := m.TagKeysByExpr(e.LHS)
if err != nil {
return nil, false, err
}
rhsKeys, rhsOk, err := m.tagKeysByExpr(e.RHS)
rhsKeys, rhsOk, err := m.TagKeysByExpr(e.RHS)
if err != nil {
return nil, false, err
}
@ -1201,7 +1224,7 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
return nil, false, fmt.Errorf("invalid operator")
}
case *influxql.ParenExpr:
return m.tagKeysByExpr(e.Expr)
return m.TagKeysByExpr(e.Expr)
}
return nil, false, fmt.Errorf("%#v", expr)
}

View File

@ -12,7 +12,6 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
@ -515,8 +514,6 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
return NewSeriesIterator(s, opt)
case "_tagKeys":
return NewTagKeysIterator(s, opt)
case "_tags":
return NewTagValuesIterator(s, opt)
default:
return nil, fmt.Errorf("unknown system source: %s", m.Name)
}
@ -1325,134 +1322,6 @@ type tagValuesIterator struct {
}
}
// NewTagValuesIterator returns a new instance of TagValuesIterator.
func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if opt.Condition == nil {
return nil, errors.New("a condition is required")
}
measurementExpr := influxql.CloneExpr(opt.Condition)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
}
return e
}), nil)
mms, ok, err := sh.index.measurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
mms = sh.index.Measurements()
sort.Sort(mms)
}
// If there are no measurements, return immediately.
if len(mms) == 0 {
return &tagValuesIterator{}, nil
}
filterExpr := influxql.CloneExpr(opt.Condition)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
}
return e
}), nil)
var series []*Series
keys := newStringSet()
for _, mm := range mms {
ss, ok, err := mm.tagKeysByExpr(opt.Condition)
if err != nil {
return nil, err
} else if !ok {
keys.add(mm.TagKeys()...)
} else {
keys = keys.union(ss)
}
ids, err := mm.seriesIDsAllOrByExpr(filterExpr)
if err != nil {
return nil, err
}
for _, id := range ids {
series = append(series, mm.SeriesByID(id))
}
}
return &tagValuesIterator{
series: series,
keys: keys.list(),
fields: influxql.VarRefs(opt.Aux).Strings(),
}, nil
}
// Stats returns stats about the points processed.
func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *tagValuesIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) {
for {
// If there are no more values then move to the next key.
if len(itr.buf.keys) == 0 {
if len(itr.series) == 0 {
return nil, nil
}
itr.buf.s = itr.series[0]
itr.buf.keys = itr.keys
itr.series = itr.series[1:]
continue
}
key := itr.buf.keys[0]
value, ok := itr.buf.s.Tags[key]
if !ok {
itr.buf.keys = itr.buf.keys[1:]
continue
}
// Prepare auxiliary fields.
auxFields := make([]interface{}, len(itr.fields))
for i, f := range itr.fields {
switch f {
case "_tagKey":
auxFields[i] = key
case "value":
auxFields[i] = value
}
}
// Return next key.
p := &influxql.FloatPoint{
Name: itr.buf.s.measurement.Name,
Aux: auxFields,
}
itr.buf.keys = itr.buf.keys[1:]
return p, nil
}
}
// measurementKeyFunc is the function called by measurementKeysIterator.
type measurementKeyFunc func(m *Measurement) []string