From 6851db3fc9060fc35f0fa707f6841b2f1c9d5475 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 15 Nov 2017 15:48:23 +0000 Subject: [PATCH] Add FGA support to SHOW MEASUREMENTS --- coordinator/statement_executor.go | 4 +- coordinator/statement_executor_test.go | 2 +- internal/tsdb_store.go | 6 +-- tests/server_concurrent_test.go | 2 +- tsdb/engine.go | 2 +- tsdb/engine/tsm1/engine.go | 4 +- tsdb/index.go | 2 +- tsdb/index/inmem/inmem.go | 34 +++++++------- tsdb/index/inmem/meta.go | 22 +++++++++ tsdb/index/tsi1/file_set.go | 49 ++++++++++++++------ tsdb/index/tsi1/index.go | 4 +- tsdb/index/tsi1/index_test.go | 8 ++-- tsdb/shard.go | 10 ++-- tsdb/store.go | 14 ++++-- tsdb/store_test.go | 64 ++++++++++++++++++++++++-- 15 files changed, 166 insertions(+), 61 deletions(-) diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 3bf3614ed7..3c77ed23aa 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -742,7 +742,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea return ErrDatabaseNameRequired } - names, err := e.TSDBStore.MeasurementNames(q.Database, q.Condition) + names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition) if err != nil || len(names) == 0 { return ctx.Send(&query.Result{ StatementID: ctx.StatementID, @@ -1378,7 +1378,7 @@ type TSDBStore interface { DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error - MeasurementNames(database string, cond influxql.Expr) ([][]byte, error) + MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index b30a93e5aa..ba771dcfc4 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -280,7 +280,7 @@ func NewQueryExecutor() *QueryExecutor { return nil } - e.TSDBStore.MeasurementNamesFn = func(database string, cond influxql.Expr) ([][]byte, error) { + e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { return nil, nil } diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index e24535a1be..1e0a4dec35 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -28,7 +28,7 @@ type TSDBStoreMock struct { ImportShardFn func(id uint64, r io.Reader) error MeasurementSeriesCountsFn func(database string) (measuments int, series int) MeasurementsCardinalityFn func(database string) (int64, error) - MeasurementNamesFn func(database string, cond influxql.Expr) ([][]byte, error) + MeasurementNamesFn func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) OpenFn func() error PathFn func() string RestoreShardFn func(id uint64, r io.Reader) error @@ -84,8 +84,8 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error { return s.ImportShardFn(id, r) } -func (s *TSDBStoreMock) MeasurementNames(database string, cond influxql.Expr) ([][]byte, error) { - return s.MeasurementNamesFn(database, cond) +func (s *TSDBStoreMock) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { + return s.MeasurementNamesFn(auth, database, cond) } func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measuments int, series int) { return s.MeasurementSeriesCountsFn(database) diff --git a/tests/server_concurrent_test.go b/tests/server_concurrent_test.go index f06fa7209a..ac9e989ee4 100644 --- a/tests/server_concurrent_test.go +++ b/tests/server_concurrent_test.go @@ -133,7 +133,7 @@ func TestConcurrentServer_ShowMeasurements(t *testing.T) { if !ok { t.Fatal("Not a local server") } - srv.TSDBStore.MeasurementNames("db0", nil) + srv.TSDBStore.MeasurementNames(query.OpenAuthorizer, "db0", nil) } runTest(10*time.Second, f1, f2) diff --git a/tsdb/engine.go b/tsdb/engine.go index 301ee4eab1..52261ee855 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -59,7 +59,7 @@ type Engine interface { SeriesN() int64 MeasurementExists(name []byte) (bool, error) - MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) + MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) MeasurementFields(measurement []byte) *MeasurementFields ForEachMeasurementName(fn func(name []byte) error) error diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 52d6838fc3..f28d5070bd 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -370,8 +370,8 @@ func (e *Engine) MeasurementExists(name []byte) (bool, error) { return e.index.MeasurementExists(name) } -func (e *Engine) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { - return e.index.MeasurementNamesByExpr(expr) +func (e *Engine) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { + return e.index.MeasurementNamesByExpr(auth, expr) } func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { diff --git a/tsdb/index.go b/tsdb/index.go index d7615d76d4..a9e9e9b61f 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -19,7 +19,7 @@ type Index interface { WithLogger(*zap.Logger) MeasurementExists(name []byte) (bool, error) - MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) + MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) DropMeasurement(name []byte) error ForEachMeasurementName(fn func(name []byte) error) error diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 54918ddb4f..ddb94481ca 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -392,24 +392,26 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) { // MeasurementNamesByExpr takes an expression containing only tags and returns a // list of matching meaurement names. -func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { +func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { i.mu.RLock() defer i.mu.RUnlock() // Return all measurement names if no expression is provided. if expr == nil { a := make([][]byte, 0, len(i.measurements)) - for name := range i.measurements { - a = append(a, []byte(name)) + for _, m := range i.measurements { + if m.Authorized(auth) { + a = append(a, m.name) + } } bytesutil.Sort(a) return a, nil } - return i.measurementNamesByExpr(expr) + return i.measurementNamesByExpr(auth, expr) } -func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { +func (i *Index) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { if expr == nil { return nil, nil } @@ -444,19 +446,19 @@ func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { // Match on name, if specified. if tag.Val == "_name" { - return i.measurementNamesByNameFilter(tf.Op, tf.Value, tf.Regex), nil + return i.measurementNamesByNameFilter(auth, tf.Op, tf.Value, tf.Regex), nil } else if influxql.IsSystemName(tag.Val) { return nil, nil } - return i.measurementNamesByTagFilters(tf), nil + return i.measurementNamesByTagFilters(auth, tf), nil case influxql.OR, influxql.AND: - lhs, err := i.measurementNamesByExpr(e.LHS) + lhs, err := i.measurementNamesByExpr(auth, e.LHS) if err != nil { return nil, err } - rhs, err := i.measurementNamesByExpr(e.RHS) + rhs, err := i.measurementNamesByExpr(auth, e.RHS) if err != nil { return nil, err } @@ -469,13 +471,13 @@ func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { return nil, fmt.Errorf("invalid tag comparison operator") } case *influxql.ParenExpr: - return i.measurementNamesByExpr(e.Expr) + return i.measurementNamesByExpr(auth, e.Expr) } return nil, fmt.Errorf("%#v", expr) } // measurementNamesByNameFilter returns the sorted measurements matching a name. -func (i *Index) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte { +func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte { var names [][]byte for _, m := range i.measurements { var matched bool @@ -490,17 +492,16 @@ func (i *Index) measurementNamesByNameFilter(op influxql.Token, val string, rege matched = !regex.MatchString(m.Name) } - if !matched { - continue + if matched && m.Authorized(auth) { + names = append(names, m.name) } - names = append(names, []byte(m.Name)) } bytesutil.Sort(names) return names } // measurementNamesByTagFilters returns the sorted measurements matching the filters on tag values. -func (i *Index) measurementNamesByTagFilters(filter *TagFilter) [][]byte { +func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagFilter) [][]byte { // Build a list of measurements matching the filters. var names [][]byte var tagMatch bool @@ -541,9 +542,8 @@ func (i *Index) measurementNamesByTagFilters(filter *TagFilter) [][]byte { // True | False | False // False | True | False // False | False | True - if tagMatch == (filter.Op == influxql.EQ || filter.Op == influxql.EQREGEX) { + if tagMatch == (filter.Op == influxql.EQ || filter.Op == influxql.EQREGEX) && m.Authorized(auth) { names = append(names, []byte(m.Name)) - continue } } diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index d9a9681e2c..e9d8800dbd 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -52,6 +52,28 @@ func NewMeasurement(database, name string) *Measurement { } } +// Authorized determines if this Measurement is authorized to be read, according +// to the provided Authorizer. A measurement is authorized to be read if at +// least one series from the measurement is authorized to be read. +func (m *Measurement) Authorized(auth query.Authorizer) bool { + if auth == nil { + return true + } + + // Note(edd): the cost of this check scales linearly with the number of series + // belonging to a measurement, which means it may become expensive when there + // are large numbers of series on a measurement. + // + // In the future we might want to push the set of series down into the + // authorizer, but that will require an API change. + for _, s := range m.SeriesByIDMap() { + if auth.AuthorizeSeriesRead(m.database, m.name, s.tags) { + return true + } + } + return false +} + func (m *Measurement) HasField(name string) bool { m.mu.RLock() _, hasField := m.fieldNames[name] diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index e7d05ce8b8..d4073c2cdd 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -39,9 +39,9 @@ func NewFileSet(database string, levels []CompactionLevel, files []File) (*FileS } // Close closes all the files in the file set. -func (p FileSet) Close() error { +func (fs FileSet) Close() error { var err error - for _, f := range p.files { + for _, f := range fs.files { if e := f.Close(); e != nil && err == nil { err = e } @@ -535,10 +535,10 @@ func (fs *FileSet) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, ) } -func (fs *FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { +func (fs *FileSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { // Return filtered list if expression exists. if expr != nil { - return fs.measurementNamesByExpr(expr) + return fs.measurementNamesByExpr(auth, expr) } itr := fs.MeasurementIterator() @@ -549,12 +549,14 @@ func (fs *FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) // Iterate over all measurements if no condition exists. var names [][]byte for e := itr.Next(); e != nil; e = itr.Next() { - names = append(names, e.Name()) + if fs.measurementAuthorized(auth, e.Name()) { + names = append(names, e.Name()) + } } return names, nil } -func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { +func (fs *FileSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { if expr == nil { return nil, nil } @@ -587,19 +589,19 @@ func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) // Match on name, if specified. if tag.Val == "_name" { - return fs.measurementNamesByNameFilter(e.Op, value, regex), nil + return fs.measurementNamesByNameFilter(auth, e.Op, value, regex), nil } else if influxql.IsSystemName(tag.Val) { return nil, nil } - return fs.measurementNamesByTagFilter(e.Op, tag.Val, value, regex), nil + return fs.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex), nil case influxql.OR, influxql.AND: - lhs, err := fs.measurementNamesByExpr(e.LHS) + lhs, err := fs.measurementNamesByExpr(auth, e.LHS) if err != nil { return nil, err } - rhs, err := fs.measurementNamesByExpr(e.RHS) + rhs, err := fs.measurementNamesByExpr(auth, e.RHS) if err != nil { return nil, err } @@ -614,14 +616,14 @@ func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) } case *influxql.ParenExpr: - return fs.measurementNamesByExpr(e.Expr) + return fs.measurementNamesByExpr(auth, e.Expr) default: return nil, fmt.Errorf("%#v", expr) } } // measurementNamesByNameFilter returns matching measurement names in sorted order. -func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte { +func (fs *FileSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte { itr := fs.MeasurementIterator() if itr == nil { return nil @@ -641,7 +643,7 @@ func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, r matched = !regex.Match(e.Name()) } - if matched { + if matched && fs.measurementAuthorized(auth, e.Name()) { names = append(names, e.Name()) } } @@ -649,7 +651,7 @@ func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, r return names } -func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) [][]byte { +func (fs *FileSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) [][]byte { var names [][]byte mitr := fs.MeasurementIterator() @@ -687,7 +689,7 @@ func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val strin // True | False | False // False | True | False // False | False | True - if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) { + if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && fs.measurementAuthorized(auth, me.Name()) { names = append(names, me.Name()) continue } @@ -697,6 +699,23 @@ func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val strin return names } +// measurementAuthorized determines if the measurement is authorized to be read. +// A measurment is authorised to be read if at least one of the measurement's +// series is authorised to be read. +func (fs *FileSet) measurementAuthorized(auth query.Authorizer, name []byte) bool { + if auth == nil { + return true + } + + sitr := fs.MeasurementSeriesIterator(name) + for series := sitr.Next(); series != nil; series = sitr.Next() { + if auth.AuthorizeSeriesRead(fs.database, name, series.Tags()) { + return true + } + } + return false +} + // HasSeries returns true if the series exists and is not tombstoned. func (fs *FileSet) HasSeries(name []byte, tags models.Tags, buf []byte) bool { for _, f := range fs.files { diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index e92c367ffe..d9e17361de 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -400,11 +400,11 @@ func (i *Index) MeasurementExists(name []byte) (bool, error) { return m != nil && !m.Deleted(), nil } -func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { +func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { fs := i.RetainFileSet() defer fs.Release() - names, err := fs.MeasurementNamesByExpr(expr) + names, err := fs.MeasurementNamesByExpr(auth, expr) // Clone byte slices since they will be used after the fileset is released. return bytesutil.CloneSlice(names), err diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index 707cccb050..c8f9373bf0 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -139,7 +139,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) { // Retrieve measurements by expression idx.Run(t, func(t *testing.T) { t.Run("EQ", func(t *testing.T) { - names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region = 'west'`)) + names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`region = 'west'`)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) { @@ -148,7 +148,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) { }) t.Run("NEQ", func(t *testing.T) { - names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region != 'east'`)) + names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`region != 'east'`)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(names, [][]byte{[]byte("disk"), []byte("mem")}) { @@ -157,7 +157,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) { }) t.Run("EQREGEX", func(t *testing.T) { - names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region =~ /east|west/`)) + names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`region =~ /east|west/`)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) { @@ -166,7 +166,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) { }) t.Run("NEQREGEX", func(t *testing.T) { - names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`country !~ /^u/`)) + names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`country !~ /^u/`)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("disk")}) { diff --git a/tsdb/shard.go b/tsdb/shard.go index 033975d336..f1f3e2bca3 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -741,12 +741,12 @@ func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro // MeasurementNamesByExpr returns names of measurements matching the condition. // If cond is nil then all measurement names are returned. -func (s *Shard) MeasurementNamesByExpr(cond influxql.Expr) ([][]byte, error) { +func (s *Shard) MeasurementNamesByExpr(auth query.Authorizer, cond influxql.Expr) ([][]byte, error) { engine, err := s.engine() if err != nil { return nil, err } - return engine.MeasurementNamesByExpr(cond) + return engine.MeasurementNamesByExpr(auth, cond) } // MeasurementNamesByRegex returns names of measurements matching the regular expression. @@ -1595,7 +1595,9 @@ func NewFieldKeysIterator(engine Engine, opt query.IteratorOptions) (query.Itera itr := &fieldKeysIterator{engine: engine} // Retrieve measurements from shard. Filter if condition specified. - names, err := engine.MeasurementNamesByExpr(opt.Condition) + // + // FGA is currently not supported when retrieving field keys. + names, err := engine.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition) if err != nil { return nil, err } @@ -1685,7 +1687,7 @@ type measurementKeyFunc func(name []byte) ([][]byte, error) func newMeasurementKeysIterator(engine Engine, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) { itr := &measurementKeysIterator{fn: fn} - names, err := engine.MeasurementNamesByExpr(opt.Condition) + names, err := engine.MeasurementNamesByExpr(opt.Authorizer, opt.Condition) if err != nil { return nil, err } diff --git a/tsdb/store.go b/tsdb/store.go index a8fd69e1c8..49b5609a13 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -956,7 +956,7 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { // MeasurementNames returns a slice of all measurements. Measurements accepts an // optional condition expression. If cond is nil, then all measurements for the // database will be returned. -func (s *Store) MeasurementNames(database string, cond influxql.Expr) ([][]byte, error) { +func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { s.mu.RLock() shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() @@ -974,7 +974,7 @@ func (s *Store) MeasurementNames(database string, cond influxql.Expr) ([][]byte, set := make(map[string]struct{}) var names [][]byte for _, sh := range shards { - a, err := sh.MeasurementNamesByExpr(cond) + a, err := sh.MeasurementNamesByExpr(auth, cond) if err != nil { return nil, err } @@ -1074,7 +1074,9 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql. // Determine list of measurements. nameSet := make(map[string]struct{}) for _, sh := range shards { - names, err := sh.MeasurementNamesByExpr(measurementExpr) + // Checking for authorisation can be done later on, when non-matching + // series might have been filtered out based on other conditions. + names, err := sh.MeasurementNamesByExpr(nil, measurementExpr) if err != nil { return nil, err } @@ -1228,7 +1230,9 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq var maxMeasurements int // Hint as to lower bound on number of measurements. for _, sh := range shards { // names will be sorted by MeasurementNamesByExpr. - names, err := sh.MeasurementNamesByExpr(measurementExpr) + // Authorisation can be done later one, when series may have been filtered + // out by other conditions. + names, err := sh.MeasurementNamesByExpr(nil, measurementExpr) if err != nil { return nil, err } @@ -1495,7 +1499,7 @@ func (s *Store) monitorShards() { // inmem shards share the same index instance so just use the first one to avoid // allocating the same measurements repeatedly first := shards[0] - names, err := first.MeasurementNamesByExpr(nil) + names, err := first.MeasurementNamesByExpr(nil, nil) if err != nil { s.Logger.Warn("cannot retrieve measurement names", zap.Error(err)) return nil diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 89a9c54999..3e051871c9 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -514,7 +514,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) { `cpu value=3 20`, ) - meas, err := s.MeasurementNames("db0", nil) + meas, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil) if err != nil { t.Fatalf("unexpected error with MeasurementNames: %v", err) } @@ -555,7 +555,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { } // Delete all the series for each measurement. - mnames, err := store.MeasurementNames("db", nil) + mnames, err := store.MeasurementNames(query.OpenAuthorizer, "db", nil) if err != nil { t.Fatal(err) } @@ -960,6 +960,64 @@ func TestStore_TagValues(t *testing.T) { } } +func TestStore_Measurements_Auth(t *testing.T) { + t.Parallel() + + test := func(index string) error { + s := MustOpenStore(index) + defer s.Close() + + // Create shard #0 with data. + s.MustCreateShardWithData("db0", "rp0", 0, + `cpu,host=serverA value=1 0`, + `cpu,host=serverA value=2 10`, + `cpu,region=west value=3 20`, + `cpu,secret=foo value=5 30`, // cpu still readable because it has other series that can be read. + `mem,secret=foo value=1 30`, + `disk value=4 30`, + ) + + authorizer := &internal.AuthorizerMock{ + AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool { + if database == "" || tags.GetString("secret") != "" { + t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags) + return false + } + return true + }, + } + + names, err := s.MeasurementNames(authorizer, "db0", nil) + if err != nil { + return err + } + + // names should not contain any measurements where none of the associated + // series are authorised for reads. + expNames := 2 + var gotNames int + for _, name := range names { + if string(name) == "mem" { + return fmt.Errorf("got measurment %q but it should be filtered.", name) + } + gotNames++ + } + + if gotNames != expNames { + return fmt.Errorf("got %d measurements, but expected %d", gotNames, expNames) + } + return nil + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + if err := test(index); err != nil { + t.Fatal(err) + } + }) + } +} + func TestStore_TagKeys_Auth(t *testing.T) { t.Parallel() @@ -990,7 +1048,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { return err } - // values should not contain any tag values associated with a series containing + // keys should not contain any tag keys associated with a series containing // a secret tag. expKeys := 3 var gotKeys int