Improve performance of TagKeys
parent
89877d7764
commit
68dd5e27c8
|
@ -65,10 +65,10 @@ type Engine interface {
|
||||||
ForEachMeasurementName(fn func(name []byte) error) error
|
ForEachMeasurementName(fn func(name []byte) error) error
|
||||||
DeleteMeasurement(name []byte) error
|
DeleteMeasurement(name []byte) error
|
||||||
|
|
||||||
// TagKeys(name []byte) ([][]byte, error)
|
|
||||||
HasTagKey(name, key []byte) (bool, error)
|
HasTagKey(name, key []byte) (bool, error)
|
||||||
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
||||||
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||||
|
TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool
|
||||||
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||||
TagKeyCardinality(name, key []byte) int
|
TagKeyCardinality(name, key []byte) int
|
||||||
|
|
||||||
|
|
|
@ -391,6 +391,12 @@ func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[
|
||||||
return e.index.MeasurementTagKeysByExpr(name, expr)
|
return e.index.MeasurementTagKeysByExpr(name, expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TagKeyHasAuthorizedSeries determines if there exist authorized series for the
|
||||||
|
// provided measurement name and tag key.
|
||||||
|
func (e *Engine) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
|
||||||
|
return e.index.TagKeyHasAuthorizedSeries(auth, name, key)
|
||||||
|
}
|
||||||
|
|
||||||
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
|
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
|
||||||
//
|
//
|
||||||
// MeasurementTagKeyValuesByExpr relies on the provided tag keys being sorted.
|
// MeasurementTagKeyValuesByExpr relies on the provided tag keys being sorted.
|
||||||
|
|
|
@ -37,6 +37,7 @@ type Index interface {
|
||||||
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
|
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
|
||||||
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
||||||
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||||
|
TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool
|
||||||
|
|
||||||
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||||
TagKeyCardinality(name, key []byte) int
|
TagKeyCardinality(name, key []byte) int
|
||||||
|
|
|
@ -274,6 +274,48 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
|
||||||
return mm.TagKeysByExpr(expr)
|
return mm.TagKeysByExpr(expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
|
||||||
|
// the provided measurement name and tag key.
|
||||||
|
func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
|
||||||
|
i.mu.RLock()
|
||||||
|
mm := i.measurements[string(name)]
|
||||||
|
i.mu.RUnlock()
|
||||||
|
|
||||||
|
if mm == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(edd): This looks like it's inefficient. Since a series can have multiple
|
||||||
|
// tag key/value pairs on it, it's possible that the same unauthorised series
|
||||||
|
// will be checked multiple times. It would be more efficient if it were
|
||||||
|
// possible to get the set of unique series IDs for a given measurement name
|
||||||
|
// and tag key.
|
||||||
|
var authorized bool
|
||||||
|
mm.SeriesByTagKeyValue(key).Range(func(_ string, seriesIDs SeriesIDs) bool {
|
||||||
|
if auth == nil || auth == query.OpenAuthorizer {
|
||||||
|
authorized = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, id := range seriesIDs {
|
||||||
|
s := mm.SeriesByID(id)
|
||||||
|
if s == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if auth.AuthorizeSeriesRead(i.database, mm.name, s.Tags()) {
|
||||||
|
authorized = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This tag key/value combination doesn't have any authorised series, so
|
||||||
|
// keep checking other tag values.
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return authorized
|
||||||
|
}
|
||||||
|
|
||||||
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
|
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
|
||||||
//
|
//
|
||||||
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
|
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
|
||||||
|
|
|
@ -1351,15 +1351,15 @@ func (t *TagKeyValue) LoadByte(value []byte) SeriesIDs {
|
||||||
// TagKeyValue is a no-op.
|
// TagKeyValue is a no-op.
|
||||||
//
|
//
|
||||||
// If f returns false then iteration over any remaining keys or values will cease.
|
// If f returns false then iteration over any remaining keys or values will cease.
|
||||||
func (t *TagKeyValue) Range(f func(k string, a SeriesIDs) bool) {
|
func (t *TagKeyValue) Range(f func(tagValue string, a SeriesIDs) bool) {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
defer t.mu.RUnlock()
|
defer t.mu.RUnlock()
|
||||||
for k, a := range t.valueIDs {
|
for tagValue, a := range t.valueIDs {
|
||||||
if !f(k, a) {
|
if !f(tagValue, a) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -642,6 +642,29 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
|
||||||
return fs.MeasurementTagKeysByExpr(name, expr)
|
return fs.MeasurementTagKeysByExpr(name, expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TagKeyHasAuthorizedSeries determines if there exist authorized series for the
|
||||||
|
// provided measurement name and tag key.
|
||||||
|
func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
|
||||||
|
fs := i.RetainFileSet()
|
||||||
|
defer fs.Release()
|
||||||
|
|
||||||
|
itr := fs.TagValueIterator(name, []byte(key))
|
||||||
|
for val := itr.Next(); val != nil; val = itr.Next() {
|
||||||
|
if auth == nil || auth == query.OpenAuthorizer {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Identify an authorized series.
|
||||||
|
si := fs.TagValueSeriesIterator(name, []byte(key), val.Value())
|
||||||
|
for se := si.Next(); se != nil; se = si.Next() {
|
||||||
|
if auth.AuthorizeSeriesRead(i.Database, se.Name(), se.Tags()) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
|
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
|
||||||
//
|
//
|
||||||
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
|
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
|
||||||
|
|
|
@ -776,6 +776,16 @@ func (s *Shard) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([]
|
||||||
return engine.MeasurementSeriesKeysByExpr(name, expr)
|
return engine.MeasurementSeriesKeysByExpr(name, expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TagKeyHasAuthorizedSeries determines if there exists an authorised series on
|
||||||
|
// the provided measurement with the provided tag key.
|
||||||
|
func (s *Shard) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
|
||||||
|
engine, err := s.engine()
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return engine.TagKeyHasAuthorizedSeries(auth, name, key)
|
||||||
|
}
|
||||||
|
|
||||||
// MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
|
// MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
|
||||||
func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
|
func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
|
||||||
engine, err := s.engine()
|
engine, err := s.engine()
|
||||||
|
|
|
@ -1096,7 +1096,7 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
|
||||||
var results []TagKeys
|
var results []TagKeys
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
// Build keyset over all shards for measurement.
|
// Build keyset over all shards for measurement.
|
||||||
keySet := make(map[string]struct{})
|
keySet := map[string]struct{}{}
|
||||||
for _, sh := range shards {
|
for _, sh := range shards {
|
||||||
shardKeySet, err := sh.MeasurementTagKeysByExpr([]byte(name), nil)
|
shardKeySet, err := sh.MeasurementTagKeysByExpr([]byte(name), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1105,6 +1105,21 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If no tag value filter is present then all the tag keys can be returned
|
||||||
|
// If they have authorized series associated with them.
|
||||||
|
if filterExpr == nil {
|
||||||
|
for tagKey := range shardKeySet {
|
||||||
|
if sh.TagKeyHasAuthorizedSeries(auth, []byte(name), tagKey) {
|
||||||
|
keySet[tagKey] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// A tag value condition has been supplied. For each tag key filter
|
||||||
|
// the set of tag values by the condition. Only tag keys with remaining
|
||||||
|
// tag values will be included in the result set.
|
||||||
|
|
||||||
// Sort the tag keys.
|
// Sort the tag keys.
|
||||||
shardKeys := make([]string, 0, len(shardKeySet))
|
shardKeys := make([]string, 0, len(shardKeySet))
|
||||||
for k := range shardKeySet {
|
for k := range shardKeySet {
|
||||||
|
@ -1112,7 +1127,10 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
|
||||||
}
|
}
|
||||||
sort.Strings(shardKeys)
|
sort.Strings(shardKeys)
|
||||||
|
|
||||||
// Filter against tag values, skip if no values exist.
|
// TODO(edd): This is very expensive. We're materialising all unfiltered
|
||||||
|
// tag values for all required tag keys, only to see if we have any.
|
||||||
|
// Then we're throwing them all away as we only care about the tag
|
||||||
|
// keys in the result set.
|
||||||
shardValues, err := sh.MeasurementTagKeyValuesByExpr(auth, []byte(name), shardKeys, filterExpr, true)
|
shardValues, err := sh.MeasurementTagKeyValuesByExpr(auth, []byte(name), shardKeys, filterExpr, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in New Issue