Pull in new index filter

pull/8660/head
Jason Wilder 2017-07-31 10:26:14 -06:00 committed by Edd Robinson
parent 1e9ce8e0a7
commit 94a48774b7
8 changed files with 78 additions and 0 deletions

View File

@ -480,6 +480,11 @@ func NewConfig() *run.Config {
c.Data.Dir = MustTempFile()
c.Data.WALDir = MustTempFile()
indexVersion := os.Getenv("INFLUXDB_DATA_INDEX_VERSION")
if indexVersion != "" {
c.Data.Index = indexVersion
}
c.HTTPD.Enabled = true
c.HTTPD.BindAddress = "127.0.0.1:0"
c.HTTPD.LogEnabled = testing.Verbose()

View File

@ -63,6 +63,7 @@ type Engine interface {
// TagKeys(name []byte) ([][]byte, error)
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(name, key []byte, expr influxql.Expr) (map[string]struct{}, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int

View File

@ -348,6 +348,10 @@ func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[
return e.index.MeasurementTagKeysByExpr(name, expr)
}
func (e *Engine) MeasurementTagKeyValuesByExpr(name, key []byte, expr influxql.Expr) (map[string]struct{}, error) {
return e.index.MeasurementTagKeyValuesByExpr(name, key, expr)
}
func (e *Engine) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
return e.index.ForEachMeasurementTagKey(name, fn)
}

View File

@ -35,6 +35,7 @@ type Index interface {
HasTagKey(name, key []byte) (bool, error)
TagSets(name []byte, options influxql.IteratorOptions) ([]*influxql.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(name, key []byte, expr influxql.Expr) (map[string]struct{}, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int

View File

@ -269,6 +269,30 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
return mm.TagKeysByExpr(expr)
}
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
func (i *Index) MeasurementTagKeyValuesByExpr(name, key []byte, expr influxql.Expr) (map[string]struct{}, error) {
i.mu.RLock()
mm := i.measurements[string(name)]
i.mu.RUnlock()
if mm == nil {
return nil, nil
}
ids, _, _ := mm.WalkWhereForSeriesIds(expr)
if ids.Len() == 0 && expr == nil {
values := mm.TagValues(string(key))
x := make(map[string]struct{}, len(values))
for _, v := range values {
x[v] = struct{}{}
}
return x, nil
}
vals := mm.tagValuesByKeyAndSeriesID([]string{string(key)}, ids)[string(key)]
return vals, nil
}
// ForEachMeasurementTagKey iterates over all tag keys for a measurement.
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
// Ensure we do not hold a lock on the index while fn executes in case fn tries

View File

@ -282,6 +282,31 @@ func (fs *FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma
return nil, fmt.Errorf("%#v", expr)
}
func (fs *FileSet) tagValuesByKeyAndExpr(name, key []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) (map[string]struct{}, error) {
itr, err := fs.seriesByExprIterator(name, expr, fieldset.Fields(string(name)))
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
// Set of all tag values.
tagValues := make(map[string]struct{})
// Iterate all series to collect tag values.
for e := itr.Next(); e != nil; e = itr.Next() {
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
tags := e.Tags()
tagVal := tags.Get(key)
if _, ok := tagValues[string(tagVal)]; !ok {
tagValues[string(tagVal)] = struct{}{}
}
}
return tagValues, nil
}
// tagKeysByFilter will filter the tag keys for the measurement.
func (fs *FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, regex *regexp.Regexp) map[string]struct{} {
ss := make(map[string]struct{})

View File

@ -614,6 +614,23 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
return fs.MeasurementTagKeysByExpr(name, expr)
}
func (i *Index) MeasurementTagKeyValuesByExpr(name, key []byte, expr influxql.Expr) (map[string]struct{}, error) {
fs := i.RetainFileSet()
defer fs.Release()
values := make(map[string]struct{})
// If there is not expr, we return all tag values for the key
if expr == nil {
itr := fs.TagValueIterator(name, key)
for val := itr.Next(); val != nil; val = itr.Next() {
values[string(val.Value())] = struct{}{}
}
return values, nil
}
return fs.tagValuesByKeyAndExpr(name, key, expr, i.fieldset)
}
// ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression.
func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, condition influxql.Expr, fn func(tags models.Tags) error) error {
fs := i.RetainFileSet()

View File

@ -1114,6 +1114,7 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
if bytes.Compare(bk, tagSet.lastKey) == 1 {
tagSet.lastKey = bk
}
}
// Loop over all keys for each series.