inmem tests passing.
parent
f5f85d65f9
commit
493c1ed0d1
|
@ -7243,13 +7243,14 @@ func TestServer_Query_ShowSeries(t *testing.T) {
|
|||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
var once sync.Once
|
||||
for _, query := range test.queries {
|
||||
t.Run(query.name, func(t *testing.T) {
|
||||
if i == 0 {
|
||||
once.Do(func() {
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
if query.skip {
|
||||
t.Skipf("SKIP:: %s", query.name)
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ type Engine interface {
|
|||
SeriesN() int64
|
||||
|
||||
MeasurementExists(name []byte) (bool, error)
|
||||
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
||||
// MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
||||
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
||||
MeasurementFieldSet() *MeasurementFieldSet
|
||||
MeasurementFields(measurement []byte) *MeasurementFields
|
||||
|
@ -68,7 +68,7 @@ type Engine interface {
|
|||
HasTagKey(name, key []byte) (bool, error)
|
||||
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
||||
// MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||
// ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||
TagKeyCardinality(name, key []byte) int
|
||||
|
||||
// Statistics will return statistics relevant to this engine.
|
||||
|
|
|
@ -351,9 +351,11 @@ func (e *Engine) MeasurementExists(name []byte) (bool, error) {
|
|||
return e.index.MeasurementExists(name)
|
||||
}
|
||||
|
||||
/*
|
||||
func (e *Engine) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||
return e.index.MeasurementNamesByExpr(expr)
|
||||
}
|
||||
*/
|
||||
|
||||
func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||
return e.index.MeasurementNamesByRegex(re)
|
||||
|
@ -392,10 +394,6 @@ func (e *Engine) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byt
|
|||
}
|
||||
*/
|
||||
|
||||
func (e *Engine) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
|
||||
return e.index.ForEachMeasurementTagKey(name, fn)
|
||||
}
|
||||
|
||||
func (e *Engine) TagKeyCardinality(name, key []byte) int {
|
||||
return e.index.TagKeyCardinality(name, key)
|
||||
}
|
||||
|
|
493
tsdb/index.go
493
tsdb/index.go
|
@ -25,7 +25,7 @@ type Index interface {
|
|||
|
||||
Database() string
|
||||
MeasurementExists(name []byte) (bool, error)
|
||||
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
||||
// MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
||||
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
||||
DropMeasurement(name []byte) error
|
||||
ForEachMeasurementName(fn func(name []byte) error) error
|
||||
|
@ -39,15 +39,18 @@ type Index interface {
|
|||
SeriesN() int64
|
||||
|
||||
HasTagKey(name, key []byte) (bool, error)
|
||||
HasTagValue(name, key, value []byte) (bool, error)
|
||||
|
||||
// TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
|
||||
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
||||
// MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||
|
||||
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||
// ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||
TagKeyCardinality(name, key []byte) int
|
||||
|
||||
// InfluxQL system iterators
|
||||
MeasurementIterator() (MeasurementIterator, error)
|
||||
TagKeyIterator(name []byte) (TagKeyIterator, error)
|
||||
TagValueIterator(auth query.Authorizer, name, key []byte) (TagValueIterator, error)
|
||||
MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error)
|
||||
TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error)
|
||||
|
@ -610,7 +613,7 @@ type seriesPointIterator struct {
|
|||
indexSet IndexSet
|
||||
fieldset *MeasurementFieldSet
|
||||
mitr MeasurementIterator
|
||||
sitr SeriesIDIterator
|
||||
keys [][]byte
|
||||
opt query.IteratorOptions
|
||||
|
||||
point query.FloatPoint // reusable point
|
||||
|
@ -658,14 +661,7 @@ func (itr *seriesPointIterator) Stats() query.IteratorStats { return query.Itera
|
|||
func (itr *seriesPointIterator) Close() (err error) {
|
||||
itr.once.Do(func() {
|
||||
if itr.mitr != nil {
|
||||
if e := itr.mitr.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
if itr.sitr != nil {
|
||||
if e := itr.sitr.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
err = itr.mitr.Close()
|
||||
}
|
||||
})
|
||||
return err
|
||||
|
@ -674,13 +670,9 @@ func (itr *seriesPointIterator) Close() (err error) {
|
|||
// Next emits the next point in the iterator.
|
||||
func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
|
||||
for {
|
||||
// Create new series iterator, if necessary.
|
||||
// Read series keys for next measurement if no more keys remaining.
|
||||
// Exit if there are no measurements remaining.
|
||||
if itr.sitr == nil {
|
||||
if itr.mitr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if len(itr.keys) == 0 {
|
||||
m, err := itr.mitr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -688,28 +680,16 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(m, itr.opt.Condition)
|
||||
if err != nil {
|
||||
if err := itr.readSeriesKeys(m); err != nil {
|
||||
return nil, err
|
||||
} else if sitr == nil {
|
||||
continue
|
||||
}
|
||||
itr.sitr = sitr
|
||||
}
|
||||
|
||||
// Read next series element.
|
||||
e, err := itr.sitr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if e.SeriesID == 0 {
|
||||
itr.sitr.Close()
|
||||
itr.sitr = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert to a key.
|
||||
name, tags := ParseSeriesKey(itr.sfile.SeriesKey(e.SeriesID))
|
||||
name, tags := ParseSeriesKey(itr.keys[0])
|
||||
key := string(models.MakeKey(name, tags))
|
||||
itr.keys = itr.keys[1:]
|
||||
|
||||
// Write auxiliary fields.
|
||||
for i, f := range itr.opt.Aux {
|
||||
|
@ -723,6 +703,32 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (itr *seriesPointIterator) readSeriesKeys(name []byte) error {
|
||||
sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(name, itr.opt.Condition)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sitr == nil {
|
||||
return nil
|
||||
}
|
||||
defer sitr.Close()
|
||||
|
||||
// Slurp all series keys.
|
||||
itr.keys = itr.keys[:0]
|
||||
for {
|
||||
elem, err := sitr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if elem.SeriesID == 0 {
|
||||
break
|
||||
}
|
||||
itr.keys = append(itr.keys, itr.sfile.SeriesKey(elem.SeriesID))
|
||||
}
|
||||
|
||||
// Sort keys.
|
||||
sort.Sort(seriesKeys(itr.keys))
|
||||
return nil
|
||||
}
|
||||
|
||||
// MeasurementIterator represents a iterator over a list of measurements.
|
||||
type MeasurementIterator interface {
|
||||
Close() error
|
||||
|
@ -829,6 +835,111 @@ func (itr *measurementMergeIterator) Next() (_ []byte, err error) {
|
|||
return name, nil
|
||||
}
|
||||
|
||||
// TagKeyIterator represents a iterator over a list of tag keys.
|
||||
type TagKeyIterator interface {
|
||||
Close() error
|
||||
Next() ([]byte, error)
|
||||
}
|
||||
|
||||
type TagKeyIterators []TagKeyIterator
|
||||
|
||||
func (a TagKeyIterators) Close() (err error) {
|
||||
for i := range a {
|
||||
if e := a[i].Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// NewTagKeySliceIterator returns a TagKeyIterator that iterates over a slice.
|
||||
func NewTagKeySliceIterator(keys [][]byte) *tagKeySliceIterator {
|
||||
return &tagKeySliceIterator{keys: keys}
|
||||
}
|
||||
|
||||
// tagKeySliceIterator iterates over a slice of tag keys.
|
||||
type tagKeySliceIterator struct {
|
||||
keys [][]byte
|
||||
}
|
||||
|
||||
// Next returns the next tag key in the slice.
|
||||
func (itr *tagKeySliceIterator) Next() ([]byte, error) {
|
||||
if len(itr.keys) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
key := itr.keys[0]
|
||||
itr.keys = itr.keys[1:]
|
||||
return key, nil
|
||||
}
|
||||
|
||||
func (itr *tagKeySliceIterator) Close() error { return nil }
|
||||
|
||||
// MergeTagKeyIterators returns an iterator that merges a set of iterators.
|
||||
func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator {
|
||||
if len(itrs) == 0 {
|
||||
return nil
|
||||
} else if len(itrs) == 1 {
|
||||
return itrs[0]
|
||||
}
|
||||
|
||||
return &tagKeyMergeIterator{
|
||||
buf: make([][]byte, len(itrs)),
|
||||
itrs: itrs,
|
||||
}
|
||||
}
|
||||
|
||||
type tagKeyMergeIterator struct {
|
||||
buf [][]byte
|
||||
itrs []TagKeyIterator
|
||||
}
|
||||
|
||||
func (itr *tagKeyMergeIterator) Close() error {
|
||||
for i := range itr.itrs {
|
||||
itr.itrs[i].Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next returns the element with the next lowest key across the iterators.
|
||||
//
|
||||
// If multiple iterators contain the same key then the first is returned
|
||||
// and the remaining ones are skipped.
|
||||
func (itr *tagKeyMergeIterator) Next() (_ []byte, err error) {
|
||||
// Find next lowest key amongst the buffers.
|
||||
var key []byte
|
||||
for i, buf := range itr.buf {
|
||||
// Fill buffer.
|
||||
if buf == nil {
|
||||
if buf, err = itr.itrs[i].Next(); err != nil {
|
||||
return nil, err
|
||||
} else if buf != nil {
|
||||
itr.buf[i] = buf
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Find next lowest key.
|
||||
if key == nil || bytes.Compare(buf, key) == -1 {
|
||||
key = buf
|
||||
}
|
||||
}
|
||||
|
||||
// Return nil if no elements remaining.
|
||||
if key == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Merge elements and clear buffers.
|
||||
for i, buf := range itr.buf {
|
||||
if buf == nil || !bytes.Equal(buf, key) {
|
||||
continue
|
||||
}
|
||||
itr.buf[i] = nil
|
||||
}
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// TagValueIterator represents a iterator over a list of tag values.
|
||||
type TagValueIterator interface {
|
||||
Close() error
|
||||
|
@ -953,6 +1064,270 @@ func (is IndexSet) FieldSet() *MeasurementFieldSet {
|
|||
return is[0].FieldSet()
|
||||
}
|
||||
|
||||
// DedupeInmemIndexes returns an index set which removes duplicate in-memory indexes.
|
||||
func (is IndexSet) DedupeInmemIndexes() IndexSet {
|
||||
other := make(IndexSet, 0, len(is))
|
||||
|
||||
var hasInmem bool
|
||||
for _, idx := range is {
|
||||
if idx.Type() == "inmem" {
|
||||
if !hasInmem {
|
||||
other = append(other, idx)
|
||||
hasInmem = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
other = append(other, idx)
|
||||
}
|
||||
return other
|
||||
}
|
||||
|
||||
/*
|
||||
// MeasurementNames returns a unique, sorted list of measurements across all indexes.
|
||||
func (is IndexSet) MeasurementNames(cond influxql.Expr) ([][]byte, error) {
|
||||
// Map to deduplicate measurement names across all indexes. This is kind of naive
|
||||
// and could be improved using a sorted merge of the already sorted measurements in
|
||||
// each shard.
|
||||
set := make(map[string]struct{})
|
||||
var names [][]byte
|
||||
for _, idx := range is {
|
||||
a, err := is.MeasurementNamesByExpr(cond)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, m := range a {
|
||||
if _, ok := set[string(m)]; !ok {
|
||||
set[string(m)] = struct{}{}
|
||||
names = append(names, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
bytesutil.Sort(names)
|
||||
|
||||
return names, nil
|
||||
}
|
||||
|
||||
*/
|
||||
func (is IndexSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||
// Return filtered list if expression exists.
|
||||
if expr != nil {
|
||||
return is.measurementNamesByExpr(expr)
|
||||
}
|
||||
|
||||
itr, err := is.MeasurementIterator()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
// Iterate over all measurements if no condition exists.
|
||||
var names [][]byte
|
||||
for {
|
||||
e, err := itr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if e == nil {
|
||||
break
|
||||
}
|
||||
names = append(names, e)
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func (is IndexSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||
if expr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch e := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
|
||||
}
|
||||
|
||||
// Retrieve value or regex expression from RHS.
|
||||
var value string
|
||||
var regex *regexp.Regexp
|
||||
if influxql.IsRegexOp(e.Op) {
|
||||
re, ok := e.RHS.(*influxql.RegexLiteral)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
|
||||
}
|
||||
regex = re.Val
|
||||
} else {
|
||||
s, ok := e.RHS.(*influxql.StringLiteral)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
|
||||
}
|
||||
value = s.Val
|
||||
}
|
||||
|
||||
// Match on name, if specified.
|
||||
if tag.Val == "_name" {
|
||||
return is.measurementNamesByNameFilter(e.Op, value, regex)
|
||||
} else if influxql.IsSystemName(tag.Val) {
|
||||
return nil, nil
|
||||
}
|
||||
return is.measurementNamesByTagFilter(e.Op, tag.Val, value, regex)
|
||||
|
||||
case influxql.OR, influxql.AND:
|
||||
lhs, err := is.measurementNamesByExpr(e.LHS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rhs, err := is.measurementNamesByExpr(e.RHS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if e.Op == influxql.OR {
|
||||
return bytesutil.Union(lhs, rhs), nil
|
||||
}
|
||||
return bytesutil.Intersect(lhs, rhs), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid tag comparison operator")
|
||||
}
|
||||
|
||||
case *influxql.ParenExpr:
|
||||
return is.measurementNamesByExpr(e.Expr)
|
||||
default:
|
||||
return nil, fmt.Errorf("%#v", expr)
|
||||
}
|
||||
}
|
||||
|
||||
// measurementNamesByNameFilter returns matching measurement names in sorted order.
|
||||
func (is IndexSet) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
|
||||
itr, err := is.MeasurementIterator()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
var names [][]byte
|
||||
for {
|
||||
e, err := itr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if e == nil {
|
||||
break
|
||||
}
|
||||
|
||||
var matched bool
|
||||
switch op {
|
||||
case influxql.EQ:
|
||||
matched = string(e) == val
|
||||
case influxql.NEQ:
|
||||
matched = string(e) != val
|
||||
case influxql.EQREGEX:
|
||||
matched = regex.Match(e)
|
||||
case influxql.NEQREGEX:
|
||||
matched = !regex.Match(e)
|
||||
}
|
||||
|
||||
if matched {
|
||||
names = append(names, e)
|
||||
}
|
||||
}
|
||||
bytesutil.Sort(names)
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func (is IndexSet) measurementNamesByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
|
||||
var names [][]byte
|
||||
|
||||
mitr, err := is.MeasurementIterator()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if mitr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
defer mitr.Close()
|
||||
|
||||
for {
|
||||
me, err := mitr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if me == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// If the operator is non-regex, only check the specified value.
|
||||
var tagMatch bool
|
||||
if op == influxql.EQ || op == influxql.NEQ {
|
||||
if ok, err := is.HasTagValue(me, []byte(key), []byte(val)); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
tagMatch = true
|
||||
}
|
||||
} else {
|
||||
// Else, the operator is a regex and we have to check all tag
|
||||
// values against the regular expression.
|
||||
if err := func() error {
|
||||
// TODO(benbjohnson): Add auth.
|
||||
vitr, err := is.TagValueIterator(nil, me, []byte(key))
|
||||
if err != nil {
|
||||
return err
|
||||
} else if vitr == nil {
|
||||
return nil
|
||||
}
|
||||
defer vitr.Close()
|
||||
|
||||
for {
|
||||
if ve, err := vitr.Next(); err != nil {
|
||||
return err
|
||||
} else if ve == nil {
|
||||
return nil
|
||||
} else if regex.Match(ve) {
|
||||
tagMatch = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// XNOR gate
|
||||
//
|
||||
// tags match | operation is EQ | measurement matches
|
||||
// --------------------------------------------------
|
||||
// True | True | True
|
||||
// True | False | False
|
||||
// False | True | False
|
||||
// False | False | True
|
||||
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) {
|
||||
names = append(names, me)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
bytesutil.Sort(names)
|
||||
return names, nil
|
||||
}
|
||||
|
||||
// HasTagValue returns true if the tag value exists in any index.
|
||||
func (is IndexSet) HasTagValue(name, key, value []byte) (bool, error) {
|
||||
for _, idx := range is {
|
||||
if ok, err := idx.HasTagValue(name, key, value); err != nil {
|
||||
return false, err
|
||||
} else if ok {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// MeasurementIterator returns an iterator over all measurements in the index.
|
||||
func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) {
|
||||
a := make([]MeasurementIterator, 0, len(is))
|
||||
|
@ -968,6 +1343,21 @@ func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) {
|
|||
return MergeMeasurementIterators(a...), nil
|
||||
}
|
||||
|
||||
// TagKeyIterator returns a key iterator for a measurement.
|
||||
func (is IndexSet) TagKeyIterator(name []byte) (TagKeyIterator, error) {
|
||||
a := make([]TagKeyIterator, 0, len(is))
|
||||
for _, idx := range is {
|
||||
itr, err := idx.TagKeyIterator(name)
|
||||
if err != nil {
|
||||
TagKeyIterators(a).Close()
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return MergeTagKeyIterators(a...), nil
|
||||
}
|
||||
|
||||
// TagValueIterator returns a value iterator for a tag key.
|
||||
func (is IndexSet) TagValueIterator(auth query.Authorizer, name, key []byte) (TagValueIterator, error) {
|
||||
a := make([]TagValueIterator, 0, len(is))
|
||||
|
@ -999,6 +1389,45 @@ func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, e
|
|||
return MergeSeriesIDIterators(a...), nil
|
||||
}
|
||||
|
||||
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
|
||||
// the provided function.
|
||||
func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
|
||||
itr, err := is.TagKeyIterator(name)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if itr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
key, err := itr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if key == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := fn(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
|
||||
func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
|
||||
keys := make(map[string]struct{})
|
||||
for _, idx := range is {
|
||||
m, err := idx.MeasurementTagKeysByExpr(name, expr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k := range m {
|
||||
keys[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
|
||||
func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) {
|
||||
a := make([]SeriesIDIterator, 0, len(is))
|
||||
|
|
|
@ -258,15 +258,15 @@ func (i *Index) HasTagKey(name, key []byte) (bool, error) {
|
|||
}
|
||||
|
||||
// HasTagValue returns true if tag value exists.
|
||||
func (i *Index) HasTagValue(name, key, value []byte) bool {
|
||||
func (i *Index) HasTagValue(name, key, value []byte) (bool, error) {
|
||||
i.mu.RLock()
|
||||
mm := i.measurements[string(name)]
|
||||
i.mu.RUnlock()
|
||||
|
||||
if mm == nil {
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
return mm.HasTagKeyValue(key, value)
|
||||
return mm.HasTagKeyValue(key, value), nil
|
||||
}
|
||||
|
||||
// TagValueN returns the cardinality of a tag value.
|
||||
|
@ -739,6 +739,24 @@ func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesID
|
|||
return tsdb.NewSeriesIDSliceIterator([]uint64(m.SeriesIDsByTagValue(key, value))), nil
|
||||
}
|
||||
|
||||
func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
m := i.measurements[string(name)]
|
||||
if m == nil {
|
||||
return nil, nil
|
||||
}
|
||||
keys := m.TagKeys()
|
||||
sort.Strings(keys)
|
||||
|
||||
a := make([][]byte, len(keys))
|
||||
for i := range a {
|
||||
a[i] = []byte(keys[i])
|
||||
}
|
||||
return tsdb.NewTagKeySliceIterator(a), nil
|
||||
}
|
||||
|
||||
func (i *Index) TagValueIterator(auth query.Authorizer, name, key []byte) (tsdb.TagValueIterator, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
@ -959,7 +977,7 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
|
|||
tags := tagsSlice[i]
|
||||
for _, tag := range tags {
|
||||
// Skip if the tag value already exists.
|
||||
if idx.HasTagValue(name, tag.Key, tag.Value) {
|
||||
if ok, _ := idx.HasTagValue(name, tag.Key, tag.Value); ok {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"regexp"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/estimator/hll"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
@ -384,168 +383,6 @@ func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Series
|
|||
return tsdb.MergeSeriesIDIterators(a...)
|
||||
}
|
||||
|
||||
func (fs *FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||
// Return filtered list if expression exists.
|
||||
if expr != nil {
|
||||
return fs.measurementNamesByExpr(expr)
|
||||
}
|
||||
|
||||
itr := fs.MeasurementIterator()
|
||||
if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Iterate over all measurements if no condition exists.
|
||||
var names [][]byte
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
names = append(names, e.Name())
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||
if expr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch e := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
|
||||
}
|
||||
|
||||
// Retrieve value or regex expression from RHS.
|
||||
var value string
|
||||
var regex *regexp.Regexp
|
||||
if influxql.IsRegexOp(e.Op) {
|
||||
re, ok := e.RHS.(*influxql.RegexLiteral)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
|
||||
}
|
||||
regex = re.Val
|
||||
} else {
|
||||
s, ok := e.RHS.(*influxql.StringLiteral)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
|
||||
}
|
||||
value = s.Val
|
||||
}
|
||||
|
||||
// Match on name, if specified.
|
||||
if tag.Val == "_name" {
|
||||
return fs.measurementNamesByNameFilter(e.Op, value, regex), nil
|
||||
} else if influxql.IsSystemName(tag.Val) {
|
||||
return nil, nil
|
||||
}
|
||||
return fs.measurementNamesByTagFilter(e.Op, tag.Val, value, regex), nil
|
||||
|
||||
case influxql.OR, influxql.AND:
|
||||
lhs, err := fs.measurementNamesByExpr(e.LHS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rhs, err := fs.measurementNamesByExpr(e.RHS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if e.Op == influxql.OR {
|
||||
return bytesutil.Union(lhs, rhs), nil
|
||||
}
|
||||
return bytesutil.Intersect(lhs, rhs), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid tag comparison operator")
|
||||
}
|
||||
|
||||
case *influxql.ParenExpr:
|
||||
return fs.measurementNamesByExpr(e.Expr)
|
||||
default:
|
||||
return nil, fmt.Errorf("%#v", expr)
|
||||
}
|
||||
}
|
||||
|
||||
// measurementNamesByNameFilter returns matching measurement names in sorted order.
|
||||
func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
|
||||
itr := fs.MeasurementIterator()
|
||||
if itr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var names [][]byte
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
var matched bool
|
||||
switch op {
|
||||
case influxql.EQ:
|
||||
matched = string(e.Name()) == val
|
||||
case influxql.NEQ:
|
||||
matched = string(e.Name()) != val
|
||||
case influxql.EQREGEX:
|
||||
matched = regex.Match(e.Name())
|
||||
case influxql.NEQREGEX:
|
||||
matched = !regex.Match(e.Name())
|
||||
}
|
||||
|
||||
if matched {
|
||||
names = append(names, e.Name())
|
||||
}
|
||||
}
|
||||
bytesutil.Sort(names)
|
||||
return names
|
||||
}
|
||||
|
||||
func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) [][]byte {
|
||||
var names [][]byte
|
||||
|
||||
mitr := fs.MeasurementIterator()
|
||||
if mitr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for me := mitr.Next(); me != nil; me = mitr.Next() {
|
||||
// If the operator is non-regex, only check the specified value.
|
||||
var tagMatch bool
|
||||
if op == influxql.EQ || op == influxql.NEQ {
|
||||
if fs.HasTagValue(me.Name(), []byte(key), []byte(val)) {
|
||||
tagMatch = true
|
||||
}
|
||||
} else {
|
||||
// Else, the operator is a regex and we have to check all tag
|
||||
// values against the regular expression.
|
||||
vitr := fs.TagValueIterator(me.Name(), []byte(key))
|
||||
if vitr != nil {
|
||||
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
|
||||
if regex.Match(ve.Value()) {
|
||||
tagMatch = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// XNOR gate
|
||||
//
|
||||
// tags match | operation is EQ | measurement matches
|
||||
// --------------------------------------------------
|
||||
// True | True | True
|
||||
// True | False | False
|
||||
// False | True | False
|
||||
// False | False | True
|
||||
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) {
|
||||
names = append(names, me.Name())
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
bytesutil.Sort(names)
|
||||
return names
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns the merged measurement sketches for the FileSet.
|
||||
func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
||||
|
@ -643,6 +480,26 @@ func (itr *fileSetMeasurementIterator) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// fileSetTagKeyIterator attaches a fileset to an iterator that is released on close.
|
||||
type fileSetTagKeyIterator struct {
|
||||
once sync.Once
|
||||
fs *FileSet
|
||||
itr tsdb.TagKeyIterator
|
||||
}
|
||||
|
||||
func newFileSetTagKeyIterator(fs *FileSet, itr tsdb.TagKeyIterator) *fileSetTagKeyIterator {
|
||||
return &fileSetTagKeyIterator{fs: fs, itr: itr}
|
||||
}
|
||||
|
||||
func (itr *fileSetTagKeyIterator) Next() ([]byte, error) {
|
||||
return itr.itr.Next()
|
||||
}
|
||||
|
||||
func (itr *fileSetTagKeyIterator) Close() error {
|
||||
itr.once.Do(func() { itr.fs.Release() })
|
||||
return nil
|
||||
}
|
||||
|
||||
// fileSetTagValueIterator attaches a fileset to an iterator that is released on close.
|
||||
type fileSetTagValueIterator struct {
|
||||
once sync.Once
|
||||
|
|
|
@ -411,12 +411,14 @@ func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator,
|
|||
return tsdb.MergeSeriesIDIterators(itrs...), nil
|
||||
}
|
||||
|
||||
/*
|
||||
// MeasurementNamesByExpr returns measurement names for the provided expression.
|
||||
func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||
return i.fetchByteValues(func(idx int) ([][]byte, error) {
|
||||
return i.partitions[idx].MeasurementNamesByExpr(expr)
|
||||
})
|
||||
}
|
||||
*/
|
||||
|
||||
// MeasurementNamesByRegex returns measurement names for the provided regex.
|
||||
func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||
|
@ -605,6 +607,63 @@ func (i *Index) HasTagKey(name, key []byte) (bool, error) {
|
|||
return atomic.LoadUint32(&found) == 1, nil
|
||||
}
|
||||
|
||||
// HasTagValue returns true if tag value exists.
|
||||
func (i *Index) HasTagValue(name, key, value []byte) (bool, error) {
|
||||
n := i.availableThreads()
|
||||
|
||||
// Store errors
|
||||
var found uint32 // Use this to signal we found the tag key.
|
||||
errC := make(chan error, i.PartitionN)
|
||||
|
||||
// Check each partition for the tag key concurrently.
|
||||
var pidx uint32 // Index of maximum Partition being worked on.
|
||||
for k := 0; k < n; k++ {
|
||||
go func() {
|
||||
for {
|
||||
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
|
||||
if idx >= len(i.partitions) {
|
||||
return // No more work.
|
||||
}
|
||||
|
||||
// Check if the tag key has already been found. If it has, we
|
||||
// don't need to check this partition and can just move on.
|
||||
if atomic.LoadUint32(&found) == 1 {
|
||||
errC <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
b, err := i.partitions[idx].HasTagValue(name, key, value)
|
||||
errC <- err
|
||||
if b {
|
||||
atomic.StoreUint32(&found, 1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Check for error
|
||||
for i := 0; i < cap(errC); i++ {
|
||||
if err := <-errC; err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we found the tag key.
|
||||
return atomic.LoadUint32(&found) == 1, nil
|
||||
}
|
||||
|
||||
// TagKeyIterator returns an iterator for all keys across a single measurement.
|
||||
func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) {
|
||||
a := make([]tsdb.TagKeyIterator, 0, len(i.partitions))
|
||||
for _, p := range i.partitions {
|
||||
itr := p.TagKeyIterator(name)
|
||||
if itr != nil {
|
||||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return tsdb.MergeTagKeyIterators(a...), nil
|
||||
}
|
||||
|
||||
// TagValueIterator returns an iterator for all values across a single key.
|
||||
func (i *Index) TagValueIterator(auth query.Authorizer, name, key []byte) (tsdb.TagValueIterator, error) {
|
||||
a := make([]tsdb.TagValueIterator, 0, len(i.partitions))
|
||||
|
@ -770,17 +829,6 @@ func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte
|
|||
}
|
||||
*/
|
||||
|
||||
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
|
||||
// the provided function.
|
||||
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
|
||||
for j := 0; j < len(i.partitions); j++ {
|
||||
if err := i.partitions[j].ForEachMeasurementTagKey(name, fn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TagKeyCardinality always returns zero.
|
||||
// It is not possible to determine cardinality of tags across index files, and
|
||||
// thus it cannot be done across partitions.
|
||||
|
|
|
@ -408,6 +408,7 @@ func (i *Partition) MeasurementExists(name []byte) (bool, error) {
|
|||
return m != nil && !m.Deleted(), nil
|
||||
}
|
||||
|
||||
/*
|
||||
func (i *Partition) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||
fs := i.RetainFileSet()
|
||||
defer fs.Release()
|
||||
|
@ -417,6 +418,7 @@ func (i *Partition) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
|||
// Clone byte slices since they will be used after the fileset is released.
|
||||
return bytesutil.CloneSlice(names), err
|
||||
}
|
||||
*/
|
||||
|
||||
func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||
fs := i.RetainFileSet()
|
||||
|
@ -601,6 +603,24 @@ func (i *Partition) HasTagKey(name, key []byte) (bool, error) {
|
|||
return fs.HasTagKey(name, key), nil
|
||||
}
|
||||
|
||||
// HasTagValue returns true if tag value exists.
|
||||
func (i *Partition) HasTagValue(name, key, value []byte) (bool, error) {
|
||||
fs := i.RetainFileSet()
|
||||
defer fs.Release()
|
||||
return fs.HasTagValue(name, key, value), nil
|
||||
}
|
||||
|
||||
// TagKeyIterator returns an iterator for all keys across a single measurement.
|
||||
func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator {
|
||||
fs := i.RetainFileSet()
|
||||
itr := fs.TagKeyIterator(name)
|
||||
if itr == nil {
|
||||
fs.Release()
|
||||
return nil
|
||||
}
|
||||
return newFileSetTagKeyIterator(fs, NewTSDBTagKeyIteratorAdapter(itr))
|
||||
}
|
||||
|
||||
// TagValueIterator returns an iterator for all values across a single key.
|
||||
func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator {
|
||||
fs := i.RetainFileSet()
|
||||
|
|
|
@ -153,6 +153,34 @@ type TagKeyIterator interface {
|
|||
Next() TagKeyElem
|
||||
}
|
||||
|
||||
// tsdbTagKeyIteratorAdapter wraps TagKeyIterator to match the TSDB interface.
|
||||
// This is needed because TSDB doesn't have a concept of "deleted" tag keys.
|
||||
type tsdbTagKeyIteratorAdapter struct {
|
||||
itr TagKeyIterator
|
||||
}
|
||||
|
||||
// NewTSDBTagKeyIteratorAdapter return an iterator which implements tsdb.TagKeyIterator.
|
||||
func NewTSDBTagKeyIteratorAdapter(itr TagKeyIterator) tsdb.TagKeyIterator {
|
||||
if itr == nil {
|
||||
return nil
|
||||
}
|
||||
return &tsdbTagKeyIteratorAdapter{itr: itr}
|
||||
}
|
||||
|
||||
func (itr *tsdbTagKeyIteratorAdapter) Close() error { return nil }
|
||||
|
||||
func (itr *tsdbTagKeyIteratorAdapter) Next() ([]byte, error) {
|
||||
for {
|
||||
e := itr.itr.Next()
|
||||
if e == nil {
|
||||
return nil, nil
|
||||
} else if e.Deleted() {
|
||||
continue
|
||||
}
|
||||
return e.Key(), nil
|
||||
}
|
||||
}
|
||||
|
||||
// MergeTagKeyIterators returns an iterator that merges a set of iterators.
|
||||
// Iterators that are first in the list take precendence and a deletion by those
|
||||
// early iterators will invalidate elements by later iterators.
|
||||
|
|
|
@ -723,6 +723,7 @@ func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
|
|||
return engine.MeasurementsSketches()
|
||||
}
|
||||
|
||||
/*
|
||||
// MeasurementNamesByExpr returns names of measurements matching the condition.
|
||||
// If cond is nil then all measurement names are returned.
|
||||
func (s *Shard) MeasurementNamesByExpr(cond influxql.Expr) ([][]byte, error) {
|
||||
|
@ -732,6 +733,7 @@ func (s *Shard) MeasurementNamesByExpr(cond influxql.Expr) ([][]byte, error) {
|
|||
}
|
||||
return engine.MeasurementNamesByExpr(cond)
|
||||
}
|
||||
*/
|
||||
|
||||
// MeasurementNamesByRegex returns names of measurements matching the regular expression.
|
||||
func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||
|
@ -800,12 +802,12 @@ func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt
|
|||
|
||||
switch m.SystemIterator {
|
||||
case "_fieldKeys":
|
||||
return NewFieldKeysIterator(engine, opt)
|
||||
return NewFieldKeysIterator(s, opt)
|
||||
case "_series":
|
||||
// TODO(benbjohnson): Move up to the Shards.CreateIterator().
|
||||
return NewSeriesPointIterator(s.sfile, IndexSet{s.index}, engine.MeasurementFieldSet(), opt)
|
||||
case "_tagKeys":
|
||||
return NewTagKeysIterator(engine, opt)
|
||||
return NewTagKeysIterator(s, opt)
|
||||
}
|
||||
return engine.CreateIterator(ctx, m.Name, opt)
|
||||
}
|
||||
|
@ -869,7 +871,7 @@ func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influx
|
|||
}
|
||||
}
|
||||
|
||||
if err := engine.ForEachMeasurementTagKey([]byte(name), func(key []byte) error {
|
||||
if err := (IndexSet{s.index}).ForEachMeasurementTagKey([]byte(name), func(key []byte) error {
|
||||
dimensions[string(key)] = struct{}{}
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
@ -1062,14 +1064,6 @@ func (s *Shard) ForEachMeasurementName(fn func(name []byte) error) error {
|
|||
return engine.ForEachMeasurementName(fn)
|
||||
}
|
||||
|
||||
func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
|
||||
engine, err := s.engine()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return engine.ForEachMeasurementTagKey(name, fn)
|
||||
}
|
||||
|
||||
func (s *Shard) TagKeyCardinality(name, key []byte) int {
|
||||
engine, err := s.engine()
|
||||
if err != nil {
|
||||
|
@ -1529,11 +1523,11 @@ type Field struct {
|
|||
|
||||
// NewFieldKeysIterator returns an iterator that can be iterated over to
|
||||
// retrieve field keys.
|
||||
func NewFieldKeysIterator(engine Engine, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
itr := &fieldKeysIterator{engine: engine}
|
||||
func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
itr := &fieldKeysIterator{shard: sh}
|
||||
|
||||
// Retrieve measurements from shard. Filter if condition specified.
|
||||
names, err := engine.MeasurementNamesByExpr(opt.Condition)
|
||||
names, err := (IndexSet{sh.index}).MeasurementNamesByExpr(opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1544,9 +1538,9 @@ func NewFieldKeysIterator(engine Engine, opt query.IteratorOptions) (query.Itera
|
|||
|
||||
// fieldKeysIterator iterates over measurements and gets field keys from each measurement.
|
||||
type fieldKeysIterator struct {
|
||||
engine Engine
|
||||
names [][]byte // remaining measurement names
|
||||
buf struct {
|
||||
shard *Shard
|
||||
names [][]byte // remaining measurement names
|
||||
buf struct {
|
||||
name []byte // current measurement name
|
||||
fields []Field // current measurement's fields
|
||||
}
|
||||
|
@ -1568,7 +1562,7 @@ func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) {
|
|||
}
|
||||
|
||||
itr.buf.name = itr.names[0]
|
||||
mf := itr.engine.MeasurementFields(itr.buf.name)
|
||||
mf := itr.shard.MeasurementFields(itr.buf.name)
|
||||
if mf != nil {
|
||||
fset := mf.FieldSet()
|
||||
if len(fset) == 0 {
|
||||
|
@ -1604,10 +1598,10 @@ func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) {
|
|||
}
|
||||
|
||||
// NewTagKeysIterator returns a new instance of TagKeysIterator.
|
||||
func NewTagKeysIterator(engine Engine, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
fn := func(name []byte) ([][]byte, error) {
|
||||
var keys [][]byte
|
||||
if err := engine.ForEachMeasurementTagKey(name, func(key []byte) error {
|
||||
if err := (IndexSet{sh.index}).ForEachMeasurementTagKey(name, func(key []byte) error {
|
||||
keys = append(keys, key)
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
@ -1615,15 +1609,15 @@ func NewTagKeysIterator(engine Engine, opt query.IteratorOptions) (query.Iterato
|
|||
}
|
||||
return keys, nil
|
||||
}
|
||||
return newMeasurementKeysIterator(engine, fn, opt)
|
||||
return newMeasurementKeysIterator(sh, fn, opt)
|
||||
}
|
||||
|
||||
// measurementKeyFunc is the function called by measurementKeysIterator.
|
||||
type measurementKeyFunc func(name []byte) ([][]byte, error)
|
||||
|
||||
func newMeasurementKeysIterator(engine Engine, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) {
|
||||
func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) {
|
||||
itr := &measurementKeysIterator{fn: fn}
|
||||
names, err := engine.MeasurementNamesByExpr(opt.Condition)
|
||||
names, err := (IndexSet{sh.index}).MeasurementNamesByExpr(opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
260
tsdb/store.go
260
tsdb/store.go
|
@ -16,7 +16,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
|
@ -1023,34 +1022,15 @@ func (s *Store) MeasurementNames(database string, cond influxql.Expr) ([][]byte,
|
|||
shards := s.filterShards(byDatabase(database))
|
||||
s.mu.RUnlock()
|
||||
|
||||
// If we're using the inmem index then all shards contain a duplicate
|
||||
// version of the global index. We don't need to iterate over all shards
|
||||
// since we have everything we need from the first shard.
|
||||
if len(shards) > 0 && shards[0].IndexType() == "inmem" {
|
||||
shards = shards[:1]
|
||||
}
|
||||
|
||||
// Map to deduplicate measurement names across all shards. This is kind of naive
|
||||
// and could be improved using a sorted merge of the already sorted measurements in
|
||||
// each shard.
|
||||
set := make(map[string]struct{})
|
||||
var names [][]byte
|
||||
// Build indexset.
|
||||
is := make(IndexSet, 0, len(shards))
|
||||
for _, sh := range shards {
|
||||
a, err := sh.MeasurementNamesByExpr(cond)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, m := range a {
|
||||
if _, ok := set[string(m)]; !ok {
|
||||
set[string(m)] = struct{}{}
|
||||
names = append(names, m)
|
||||
}
|
||||
if sh.index != nil {
|
||||
is = append(is, sh.index)
|
||||
}
|
||||
}
|
||||
bytesutil.Sort(names)
|
||||
|
||||
return names, nil
|
||||
is = is.DedupeInmemIndexes()
|
||||
return is.MeasurementNamesByExpr(cond)
|
||||
}
|
||||
|
||||
// MeasurementSeriesCounts returns the number of measurements and series in all
|
||||
|
@ -1115,87 +1095,67 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
|
|||
}), nil)
|
||||
|
||||
// Get all the shards we're interested in.
|
||||
shards := make([]*Shard, 0, len(shardIDs))
|
||||
var sfile *SeriesFile
|
||||
is := make(IndexSet, 0, len(shardIDs))
|
||||
s.mu.RLock()
|
||||
for _, sid := range shardIDs {
|
||||
shard, ok := s.shards[sid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
shards = append(shards, shard)
|
||||
sfile = shard.sfile
|
||||
is = append(is, shard.index)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
// If we're using the inmem index then all shards contain a duplicate
|
||||
// version of the global index. We don't need to iterate over all shards
|
||||
// since we have everything we need from the first shard.
|
||||
if len(shards) > 0 && shards[0].IndexType() == "inmem" {
|
||||
shards = shards[:1]
|
||||
}
|
||||
|
||||
// Determine list of measurements.
|
||||
nameSet := make(map[string]struct{})
|
||||
for _, sh := range shards {
|
||||
names, err := sh.MeasurementNamesByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, name := range names {
|
||||
nameSet[string(name)] = struct{}{}
|
||||
}
|
||||
is = is.DedupeInmemIndexes()
|
||||
names, err := is.MeasurementNamesByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sort names.
|
||||
names := make([]string, 0, len(nameSet))
|
||||
for name := range nameSet {
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
|
||||
// Iterate over each measurement.
|
||||
var results []TagKeys
|
||||
for _, name := range names {
|
||||
// Build keyset over all shards for measurement.
|
||||
keySet := make(map[string]struct{})
|
||||
for _, sh := range shards {
|
||||
shardKeySet, err := sh.MeasurementTagKeysByExpr([]byte(name), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(shardKeySet) == 0 {
|
||||
continue
|
||||
}
|
||||
finalKeySet := make(map[string]struct{})
|
||||
|
||||
// Sort the tag keys.
|
||||
shardKeys := make([]string, 0, len(shardKeySet))
|
||||
for k := range shardKeySet {
|
||||
shardKeys = append(shardKeys, k)
|
||||
}
|
||||
sort.Strings(shardKeys)
|
||||
|
||||
// Filter against tag values, skip if no values exist.
|
||||
shardValues, err := sh.MeasurementTagKeyValuesByExpr(auth, []byte(name), shardKeys, filterExpr, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range shardKeys {
|
||||
if len(shardValues[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
keySet[shardKeys[i]] = struct{}{}
|
||||
}
|
||||
// Build keyset over all indexes for measurement.
|
||||
keySet, err := is.MeasurementTagKeysByExpr(name, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(keySet) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Sort key set.
|
||||
// Sort the tag keys.
|
||||
keys := make([]string, 0, len(keySet))
|
||||
for key := range keySet {
|
||||
keys = append(keys, key)
|
||||
for k := range keySet {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
// Filter against tag values, skip if no values exist.
|
||||
values, err := is.MeasurementTagKeyValuesByExpr(auth, sfile, name, keys, filterExpr, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range keys {
|
||||
if len(values[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
finalKeySet[keys[i]] = struct{}{}
|
||||
}
|
||||
|
||||
finalKeys := make([]string, 0, len(finalKeySet))
|
||||
for k := range finalKeySet {
|
||||
finalKeys = append(finalKeys, k)
|
||||
}
|
||||
|
||||
// Add to resultset.
|
||||
results = append(results, TagKeys{
|
||||
Measurement: name,
|
||||
Measurement: string(name),
|
||||
Keys: keys,
|
||||
})
|
||||
}
|
||||
|
@ -1267,102 +1227,96 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
|
|||
}), nil)
|
||||
|
||||
// Build index set to work on.
|
||||
shards := make([]*Shard, 0, len(shardIDs))
|
||||
var sfile *SeriesFile
|
||||
is := make(IndexSet, 0, len(shardIDs))
|
||||
s.mu.RLock()
|
||||
for _, sid := range shardIDs {
|
||||
shard, ok := s.shards[sid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
shards = append(shards, shard)
|
||||
sfile = shard.sfile
|
||||
is = append(is, shard.index)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
// If we're using the inmem index then all shards contain a duplicate
|
||||
// version of the global index. We don't need to iterate over all shards
|
||||
// since we have everything we need from the first shard.
|
||||
if len(shards) > 0 && shards[0].IndexType() == "inmem" {
|
||||
shards = shards[:1]
|
||||
}
|
||||
is = is.DedupeInmemIndexes()
|
||||
|
||||
// Stores each list of TagValues for each measurement.
|
||||
var allResults []tagValues
|
||||
var maxMeasurements int // Hint as to lower bound on number of measurements.
|
||||
for _, sh := range shards {
|
||||
// names will be sorted by MeasurementNamesByExpr.
|
||||
names, err := sh.MeasurementNamesByExpr(measurementExpr)
|
||||
// names will be sorted by MeasurementNamesByExpr.
|
||||
names, err := is.MeasurementNamesByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(names) > maxMeasurements {
|
||||
maxMeasurements = len(names)
|
||||
}
|
||||
|
||||
if allResults == nil {
|
||||
allResults = make([]tagValues, 0, len(is)*len(names)) // Assuming all series in all shards.
|
||||
}
|
||||
|
||||
// Iterate over each matching measurement in the shard. For each
|
||||
// measurement we'll get the matching tag keys (e.g., when a WITH KEYS)
|
||||
// statement is used, and we'll then use those to fetch all the relevant
|
||||
// values from matching series. Series may be filtered using a WHERE
|
||||
// filter.
|
||||
for _, name := range names {
|
||||
// Determine a list of keys from condition.
|
||||
keySet, err := is.MeasurementTagKeysByExpr(name, cond)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(names) > maxMeasurements {
|
||||
maxMeasurements = len(names)
|
||||
if len(keySet) == 0 {
|
||||
// No matching tag keys for this measurement
|
||||
continue
|
||||
}
|
||||
|
||||
if allResults == nil {
|
||||
allResults = make([]tagValues, 0, len(shards)*len(names)) // Assuming all series in all shards.
|
||||
result := tagValues{
|
||||
name: name,
|
||||
keys: make([]string, 0, len(keySet)),
|
||||
}
|
||||
|
||||
// Iterate over each matching measurement in the shard. For each
|
||||
// measurement we'll get the matching tag keys (e.g., when a WITH KEYS)
|
||||
// statement is used, and we'll then use those to fetch all the relevant
|
||||
// values from matching series. Series may be filtered using a WHERE
|
||||
// filter.
|
||||
for _, name := range names {
|
||||
// Determine a list of keys from condition.
|
||||
keySet, err := sh.MeasurementTagKeysByExpr(name, cond)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Add the keys to the tagValues and sort them.
|
||||
for k := range keySet {
|
||||
result.keys = append(result.keys, k)
|
||||
}
|
||||
sort.Sort(sort.StringSlice(result.keys))
|
||||
|
||||
if len(keySet) == 0 {
|
||||
// No matching tag keys for this measurement
|
||||
// get all the tag values for each key in the keyset.
|
||||
// Each slice in the results contains the sorted values associated
|
||||
// associated with each tag key for the measurement from the key set.
|
||||
if result.values, err = is.MeasurementTagKeyValuesByExpr(auth, sfile, name, result.keys, filterExpr, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// remove any tag keys that didn't have any authorized values
|
||||
j := 0
|
||||
for i := range result.keys {
|
||||
if len(result.values[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
result := tagValues{
|
||||
name: name,
|
||||
keys: make([]string, 0, len(keySet)),
|
||||
}
|
||||
result.keys[j] = result.keys[i]
|
||||
result.values[j] = result.values[i]
|
||||
j++
|
||||
}
|
||||
result.keys = result.keys[:j]
|
||||
result.values = result.values[:j]
|
||||
|
||||
// Add the keys to the tagValues and sort them.
|
||||
for k := range keySet {
|
||||
result.keys = append(result.keys, k)
|
||||
}
|
||||
sort.Sort(sort.StringSlice(result.keys))
|
||||
|
||||
// get all the tag values for each key in the keyset.
|
||||
// Each slice in the results contains the sorted values associated
|
||||
// associated with each tag key for the measurement from the key set.
|
||||
if result.values, err = sh.MeasurementTagKeyValuesByExpr(auth, name, result.keys, filterExpr, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// remove any tag keys that didn't have any authorized values
|
||||
j := 0
|
||||
for i := range result.keys {
|
||||
if len(result.values[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
result.keys[j] = result.keys[i]
|
||||
result.values[j] = result.values[i]
|
||||
j++
|
||||
}
|
||||
result.keys = result.keys[:j]
|
||||
result.values = result.values[:j]
|
||||
|
||||
// only include result if there are keys with values
|
||||
if len(result.keys) > 0 {
|
||||
allResults = append(allResults, result)
|
||||
}
|
||||
// only include result if there are keys with values
|
||||
if len(result.keys) > 0 {
|
||||
allResults = append(allResults, result)
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]TagValues, 0, maxMeasurements)
|
||||
|
||||
// We need to sort all results by measurement name.
|
||||
if len(shards) > 1 {
|
||||
if len(is) > 1 {
|
||||
sort.Sort(tagValuesSlice(allResults))
|
||||
}
|
||||
|
||||
|
@ -1370,7 +1324,7 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
|
|||
var i, j int
|
||||
// Used as a temporary buffer in mergeTagValues. There can be at most len(shards)
|
||||
// instances of tagValues for a given measurement.
|
||||
idxBuf := make([][2]int, 0, len(shards))
|
||||
idxBuf := make([][2]int, 0, len(is))
|
||||
for i < len(allResults) {
|
||||
// Gather all occurrences of the same measurement for merging.
|
||||
for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) {
|
||||
|
@ -1380,7 +1334,7 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
|
|||
// An invariant is that there can't be more than n instances of tag
|
||||
// key value pairs for a given measurement, where n is the number of
|
||||
// shards.
|
||||
if got, exp := j-i+1, len(shards); got > exp {
|
||||
if got, exp := j-i+1, len(is); got > exp {
|
||||
return nil, fmt.Errorf("unexpected results returned engine. Got %d measurement sets for %d shards", got, exp)
|
||||
}
|
||||
|
||||
|
@ -1556,15 +1510,15 @@ func (s *Store) monitorShards() {
|
|||
|
||||
// inmem shards share the same index instance so just use the first one to avoid
|
||||
// allocating the same measurements repeatedly
|
||||
first := shards[0]
|
||||
names, err := first.MeasurementNamesByExpr(nil)
|
||||
first := shards[0].index
|
||||
names, err := (IndexSet{first}).MeasurementNamesByExpr(nil)
|
||||
if err != nil {
|
||||
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
sh.ForEachMeasurementTagKey(name, func(k []byte) error {
|
||||
(IndexSet{sh.index}).ForEachMeasurementTagKey(name, func(k []byte) error {
|
||||
n := sh.TagKeyCardinality(name, k)
|
||||
perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100)
|
||||
if perc > 100 {
|
||||
|
|
Loading…
Reference in New Issue