From 1443b22379211ffd4bca509ab47b0b48da7d5370 Mon Sep 17 00:00:00 2001 From: Joe LeGasse Date: Tue, 19 Sep 2017 10:38:16 -0400 Subject: [PATCH] auth: add series auth to 'show tag values' --- cmd/influx_inspect/dumptsi/dumptsi.go | 2 +- coordinator/statement_executor.go | 4 ++-- coordinator/statement_executor_test.go | 2 +- tests/server_concurrent_test.go | 2 +- tsdb/engine.go | 2 +- tsdb/engine/tsm1/engine.go | 4 ++-- tsdb/index.go | 2 +- tsdb/index/inmem/inmem.go | 7 +++++-- tsdb/index/inmem/meta.go | 20 +++++++++++++++++--- tsdb/index/tsi1/file_set.go | 22 ++++++++++++++-------- tsdb/index/tsi1/index.go | 22 +++++++++++++++++----- tsdb/shard.go | 4 ++-- tsdb/store.go | 25 ++++++++++++++++++++++--- tsdb/store_test.go | 4 ++-- 14 files changed, 88 insertions(+), 34 deletions(-) diff --git a/cmd/influx_inspect/dumptsi/dumptsi.go b/cmd/influx_inspect/dumptsi/dumptsi.go index 830271a409..aaa09cecea 100644 --- a/cmd/influx_inspect/dumptsi/dumptsi.go +++ b/cmd/influx_inspect/dumptsi/dumptsi.go @@ -172,7 +172,7 @@ func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) { } } - fs, err := tsi1.NewFileSet(nil, files) + fs, err := tsi1.NewFileSet("", nil, files) if err != nil { return nil, nil, err } diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 06e8efff76..49a9b35150 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -833,7 +833,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem return ErrDatabaseNameRequired } - tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition) + tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, q.Database, q.Condition) if err != nil { return ctx.Send(&query.Result{ StatementID: ctx.StatementID, @@ -1139,7 +1139,7 @@ type TSDBStore interface { DeleteShard(id uint64) error MeasurementNames(database string, cond influxql.Expr) ([][]byte, error) - TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error) + TagValues(auth query.Authorizer, database string, cond influxql.Expr) ([]tsdb.TagValues, error) } var _ TSDBStore = LocalTSDBStore{} diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index d301d17004..2c845f3cc8 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -377,7 +377,7 @@ func (s *TSDBStore) MeasurementNames(database string, cond influxql.Expr) ([][]b return nil, nil } -func (s *TSDBStore) TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error) { +func (s *TSDBStore) TagValues(_ query.Authorizer, database string, cond influxql.Expr) ([]tsdb.TagValues, error) { return nil, nil } diff --git a/tests/server_concurrent_test.go b/tests/server_concurrent_test.go index c40989a215..9583bc32e4 100644 --- a/tests/server_concurrent_test.go +++ b/tests/server_concurrent_test.go @@ -82,7 +82,7 @@ func TestConcurrentServer_TagValues(t *testing.T) { if !ok { t.Fatal("Not a local server") } - srv.TSDBStore.TagValues("db0", cond) + srv.TSDBStore.TagValues(nil, "db0", cond) } var f3 = func() { s.DropDatabase("db0") } diff --git a/tsdb/engine.go b/tsdb/engine.go index 70b175e029..d5e5cb0d5a 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -65,7 +65,7 @@ type Engine interface { // TagKeys(name []byte) ([][]byte, error) HasTagKey(name, key []byte) (bool, error) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) - MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) + MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error TagKeyCardinality(name, key []byte) int diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index b6ddbcef0b..332228048f 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -360,8 +360,8 @@ func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[ // for the earliest tag k will be available in index 0 of the returned values // slice. // -func (e *Engine) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { - return e.index.MeasurementTagKeyValuesByExpr(name, keys, expr, keysSorted) +func (e *Engine) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { + return e.index.MeasurementTagKeyValuesByExpr(auth, name, keys, expr, keysSorted) } func (e *Engine) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { diff --git a/tsdb/index.go b/tsdb/index.go index d479adc661..2cb54a9c41 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -36,7 +36,7 @@ type Index interface { HasTagKey(name, key []byte) (bool, error) TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) - MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) + MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error TagKeyCardinality(name, key []byte) int diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 5840658c86..6fb70287e1 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -277,7 +277,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // // See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this // method. -func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { +func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { i.mu.RLock() mm := i.measurements[string(name)] i.mu.RUnlock() @@ -296,7 +296,7 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i ids, _, _ := mm.WalkWhereForSeriesIds(expr) if ids.Len() == 0 && expr == nil { for ki, key := range keys { - values := mm.TagValues(key) + values := mm.TagValues(auth, key) sort.Sort(sort.StringSlice(values)) results[ki] = values } @@ -323,6 +323,9 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i if s == nil { continue } + if auth != nil && !auth.AuthorizeSeriesRead(i.database, s.Measurement().name, s.Tags()) { + continue + } // Iterate the tag keys we're interested in and collect values // from this series, if they exist. diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index 685fd4b36c..1964f2fd39 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -1455,12 +1455,26 @@ func (m *Measurement) TagKeys() []string { } // TagValues returns all the values for the given tag key, in an arbitrary order. -func (m *Measurement) TagValues(key string) []string { +func (m *Measurement) TagValues(auth query.Authorizer, key string) []string { m.mu.RLock() defer m.mu.RUnlock() values := make([]string, 0, len(m.seriesByTagKeyValue[key])) - for v := range m.seriesByTagKeyValue[key] { - values = append(values, v) +VALUES: + for v, series := range m.seriesByTagKeyValue[key] { + if auth == nil { + values = append(values, v) + } else { + for _, sid := range series { + s := m.seriesByID[sid] + if s == nil { + continue + } + if auth.AuthorizeSeriesRead(m.database, m.name, s.Tags()) { + values = append(values, v) + continue VALUES + } + } + } } return values } diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 5177cc39b6..e52dc26d96 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -12,22 +12,25 @@ import ( "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/estimator/hll" + "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/tsdb" ) // FileSet represents a collection of files. type FileSet struct { - levels []CompactionLevel - files []File - filters []*bloom.Filter // per-level filters + levels []CompactionLevel + files []File + filters []*bloom.Filter // per-level filters + database string } // NewFileSet returns a new instance of FileSet. -func NewFileSet(levels []CompactionLevel, files []File) (*FileSet, error) { +func NewFileSet(database string, levels []CompactionLevel, files []File) (*FileSet, error) { fs := &FileSet{ - levels: levels, - files: files, - filters: make([]*bloom.Filter, len(levels)), + levels: levels, + files: files, + filters: make([]*bloom.Filter, len(levels)), + database: database, } if err := fs.buildFilters(); err != nil { return nil, err @@ -312,7 +315,7 @@ func (fs *FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma // // N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending // lexicographic order. -func (fs *FileSet) tagValuesByKeyAndExpr(name []byte, keys []string, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) ([]map[string]struct{}, error) { +func (fs *FileSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) ([]map[string]struct{}, error) { itr, err := fs.seriesByExprIterator(name, expr, fieldset.Fields(string(name))) if err != nil { return nil, err @@ -337,6 +340,9 @@ func (fs *FileSet) tagValuesByKeyAndExpr(name []byte, keys []string, expr influx // Iterate all series to collect tag values. for e := itr.Next(); e != nil; e = itr.Next() { + if auth != nil && !auth.AuthorizeSeriesRead(fs.database, e.Name(), e.Tags()) { + continue + } for _, t := range e.Tags() { if idx, ok := keyIdxs[string(t.Key)]; ok { resultSet[idx][string(t.Value)] = struct{}{} diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index eedc7891c4..5a36fea384 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -184,7 +184,7 @@ func (i *Index) Open() error { files = append(files, f) } } - fs, err := NewFileSet(i.levels, files) + fs, err := NewFileSet(i.Database, i.levels, files) if err != nil { return err } @@ -639,7 +639,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // // See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this // method. -func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { +func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { fs := i.RetainFileSet() defer fs.Release() @@ -658,8 +658,20 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i if expr == nil { for ki, key := range keys { itr := fs.TagValueIterator(name, []byte(key)) - for val := itr.Next(); val != nil; val = itr.Next() { - results[ki] = append(results[ki], string(val.Value())) + if auth != nil { + for val := itr.Next(); val != nil; val = itr.Next() { + si := fs.TagValueSeriesIterator(name, []byte(key), val.Value()) + for se := si.Next(); se != nil; se = si.Next() { + if auth.AuthorizeSeriesRead(i.Database, se.Name(), se.Tags()) { + results[ki] = append(results[ki], string(val.Value())) + break + } + } + } + } else { + for val := itr.Next(); val != nil; val = itr.Next() { + results[ki] = append(results[ki], string(val.Value())) + } } } return results, nil @@ -668,7 +680,7 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i // This is the case where we have filtered series by some WHERE condition. // We only care about the tag values for the keys given the // filtered set of series ids. - resultSet, err := fs.tagValuesByKeyAndExpr(name, keys, expr, i.fieldset) + resultSet, err := fs.tagValuesByKeyAndExpr(auth, name, keys, expr, i.fieldset) if err != nil { return nil, err } diff --git a/tsdb/shard.go b/tsdb/shard.go index ced73e54fb..35ec90d55d 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -775,12 +775,12 @@ func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // MeasurementTagKeyValuesByExpr returns all the tag keys values for the // provided expression. -func (s *Shard) MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { +func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { engine, err := s.engine() if err != nil { return nil, err } - return engine.MeasurementTagKeyValuesByExpr(name, key, expr, keysSorted) + return engine.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted) } // MeasurementFields returns fields for a measurement. diff --git a/tsdb/store.go b/tsdb/store.go index d6ba0cc51e..28f3b78a06 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/limiter" + "github.com/influxdata/influxdb/query" "github.com/uber-go/zap" ) @@ -1025,7 +1026,7 @@ func (a tagValuesSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } // TagValues returns the tag keys and values in the given database, matching the condition. -func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error) { +func (s *Store) TagValues(auth query.Authorizer, database string, cond influxql.Expr) ([]TagValues, error) { if cond == nil { return nil, errors.New("a condition is required") } @@ -1121,10 +1122,28 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err // get all the tag values for each key in the keyset. // Each slice in the results contains the sorted values associated // associated with each tag key for the measurement from the key set. - if result.values, err = sh.MeasurementTagKeyValuesByExpr(name, result.keys, filterExpr, true); err != nil { + if result.values, err = sh.MeasurementTagKeyValuesByExpr(auth, name, result.keys, filterExpr, true); err != nil { return nil, err } - allResults = append(allResults, result) + + // remove any tag keys that didn't have any authorized values + j := 0 + for i := range result.keys { + if len(result.values[i]) == 0 { + continue + } + + result.keys[j] = result.keys[i] + result.values[j] = result.values[i] + j++ + } + result.keys = result.keys[:j] + result.values = result.values[:j] + + // only include result if there are keys with values + if len(result.keys) > 0 { + allResults = append(allResults, result) + } } } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 60c21f697c..eb6f395521 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -938,7 +938,7 @@ func TestStore_TagValues(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { setup(index) t.Run(example.Name+"_"+index, func(t *testing.T) { - got, err := s.TagValues("db0", example.Expr) + got, err := s.TagValues(nil, "db0", example.Expr) if err != nil { t.Fatal(err) } @@ -1168,7 +1168,7 @@ func BenchmarkStore_TagValues(b *testing.B) { } b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - if tvResult, err = s.TagValues("db0", condition); err != nil { + if tvResult, err = s.TagValues(nil, "db0", condition); err != nil { b.Fatal(err) } }