Add FGA support to SHOW MEASUREMENTS
parent
5298339f21
commit
6851db3fc9
|
@ -742,7 +742,7 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
|
||||||
return ErrDatabaseNameRequired
|
return ErrDatabaseNameRequired
|
||||||
}
|
}
|
||||||
|
|
||||||
names, err := e.TSDBStore.MeasurementNames(q.Database, q.Condition)
|
names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition)
|
||||||
if err != nil || len(names) == 0 {
|
if err != nil || len(names) == 0 {
|
||||||
return ctx.Send(&query.Result{
|
return ctx.Send(&query.Result{
|
||||||
StatementID: ctx.StatementID,
|
StatementID: ctx.StatementID,
|
||||||
|
@ -1378,7 +1378,7 @@ type TSDBStore interface {
|
||||||
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
|
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
|
||||||
DeleteShard(id uint64) error
|
DeleteShard(id uint64) error
|
||||||
|
|
||||||
MeasurementNames(database string, cond influxql.Expr) ([][]byte, error)
|
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
|
||||||
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
|
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
|
||||||
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
|
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
|
||||||
|
|
||||||
|
|
|
@ -280,7 +280,7 @@ func NewQueryExecutor() *QueryExecutor {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
e.TSDBStore.MeasurementNamesFn = func(database string, cond influxql.Expr) ([][]byte, error) {
|
e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ type TSDBStoreMock struct {
|
||||||
ImportShardFn func(id uint64, r io.Reader) error
|
ImportShardFn func(id uint64, r io.Reader) error
|
||||||
MeasurementSeriesCountsFn func(database string) (measuments int, series int)
|
MeasurementSeriesCountsFn func(database string) (measuments int, series int)
|
||||||
MeasurementsCardinalityFn func(database string) (int64, error)
|
MeasurementsCardinalityFn func(database string) (int64, error)
|
||||||
MeasurementNamesFn func(database string, cond influxql.Expr) ([][]byte, error)
|
MeasurementNamesFn func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
|
||||||
OpenFn func() error
|
OpenFn func() error
|
||||||
PathFn func() string
|
PathFn func() string
|
||||||
RestoreShardFn func(id uint64, r io.Reader) error
|
RestoreShardFn func(id uint64, r io.Reader) error
|
||||||
|
@ -84,8 +84,8 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source
|
||||||
func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
|
func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
|
||||||
return s.ImportShardFn(id, r)
|
return s.ImportShardFn(id, r)
|
||||||
}
|
}
|
||||||
func (s *TSDBStoreMock) MeasurementNames(database string, cond influxql.Expr) ([][]byte, error) {
|
func (s *TSDBStoreMock) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
|
||||||
return s.MeasurementNamesFn(database, cond)
|
return s.MeasurementNamesFn(auth, database, cond)
|
||||||
}
|
}
|
||||||
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measuments int, series int) {
|
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measuments int, series int) {
|
||||||
return s.MeasurementSeriesCountsFn(database)
|
return s.MeasurementSeriesCountsFn(database)
|
||||||
|
|
|
@ -133,7 +133,7 @@ func TestConcurrentServer_ShowMeasurements(t *testing.T) {
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("Not a local server")
|
t.Fatal("Not a local server")
|
||||||
}
|
}
|
||||||
srv.TSDBStore.MeasurementNames("db0", nil)
|
srv.TSDBStore.MeasurementNames(query.OpenAuthorizer, "db0", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
runTest(10*time.Second, f1, f2)
|
runTest(10*time.Second, f1, f2)
|
||||||
|
|
|
@ -59,7 +59,7 @@ type Engine interface {
|
||||||
SeriesN() int64
|
SeriesN() int64
|
||||||
|
|
||||||
MeasurementExists(name []byte) (bool, error)
|
MeasurementExists(name []byte) (bool, error)
|
||||||
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error)
|
||||||
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
||||||
MeasurementFields(measurement []byte) *MeasurementFields
|
MeasurementFields(measurement []byte) *MeasurementFields
|
||||||
ForEachMeasurementName(fn func(name []byte) error) error
|
ForEachMeasurementName(fn func(name []byte) error) error
|
||||||
|
|
|
@ -370,8 +370,8 @@ func (e *Engine) MeasurementExists(name []byte) (bool, error) {
|
||||||
return e.index.MeasurementExists(name)
|
return e.index.MeasurementExists(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
func (e *Engine) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||||
return e.index.MeasurementNamesByExpr(expr)
|
return e.index.MeasurementNamesByExpr(auth, expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||||
|
|
|
@ -19,7 +19,7 @@ type Index interface {
|
||||||
WithLogger(*zap.Logger)
|
WithLogger(*zap.Logger)
|
||||||
|
|
||||||
MeasurementExists(name []byte) (bool, error)
|
MeasurementExists(name []byte) (bool, error)
|
||||||
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error)
|
||||||
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
||||||
DropMeasurement(name []byte) error
|
DropMeasurement(name []byte) error
|
||||||
ForEachMeasurementName(fn func(name []byte) error) error
|
ForEachMeasurementName(fn func(name []byte) error) error
|
||||||
|
|
|
@ -392,24 +392,26 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) {
|
||||||
|
|
||||||
// MeasurementNamesByExpr takes an expression containing only tags and returns a
|
// MeasurementNamesByExpr takes an expression containing only tags and returns a
|
||||||
// list of matching meaurement names.
|
// list of matching meaurement names.
|
||||||
func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||||
i.mu.RLock()
|
i.mu.RLock()
|
||||||
defer i.mu.RUnlock()
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
// Return all measurement names if no expression is provided.
|
// Return all measurement names if no expression is provided.
|
||||||
if expr == nil {
|
if expr == nil {
|
||||||
a := make([][]byte, 0, len(i.measurements))
|
a := make([][]byte, 0, len(i.measurements))
|
||||||
for name := range i.measurements {
|
for _, m := range i.measurements {
|
||||||
a = append(a, []byte(name))
|
if m.Authorized(auth) {
|
||||||
|
a = append(a, m.name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
bytesutil.Sort(a)
|
bytesutil.Sort(a)
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return i.measurementNamesByExpr(expr)
|
return i.measurementNamesByExpr(auth, expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
func (i *Index) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||||
if expr == nil {
|
if expr == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -444,19 +446,19 @@ func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||||
|
|
||||||
// Match on name, if specified.
|
// Match on name, if specified.
|
||||||
if tag.Val == "_name" {
|
if tag.Val == "_name" {
|
||||||
return i.measurementNamesByNameFilter(tf.Op, tf.Value, tf.Regex), nil
|
return i.measurementNamesByNameFilter(auth, tf.Op, tf.Value, tf.Regex), nil
|
||||||
} else if influxql.IsSystemName(tag.Val) {
|
} else if influxql.IsSystemName(tag.Val) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return i.measurementNamesByTagFilters(tf), nil
|
return i.measurementNamesByTagFilters(auth, tf), nil
|
||||||
case influxql.OR, influxql.AND:
|
case influxql.OR, influxql.AND:
|
||||||
lhs, err := i.measurementNamesByExpr(e.LHS)
|
lhs, err := i.measurementNamesByExpr(auth, e.LHS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rhs, err := i.measurementNamesByExpr(e.RHS)
|
rhs, err := i.measurementNamesByExpr(auth, e.RHS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -469,13 +471,13 @@ func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
||||||
return nil, fmt.Errorf("invalid tag comparison operator")
|
return nil, fmt.Errorf("invalid tag comparison operator")
|
||||||
}
|
}
|
||||||
case *influxql.ParenExpr:
|
case *influxql.ParenExpr:
|
||||||
return i.measurementNamesByExpr(e.Expr)
|
return i.measurementNamesByExpr(auth, e.Expr)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("%#v", expr)
|
return nil, fmt.Errorf("%#v", expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// measurementNamesByNameFilter returns the sorted measurements matching a name.
|
// measurementNamesByNameFilter returns the sorted measurements matching a name.
|
||||||
func (i *Index) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
|
func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
|
||||||
var names [][]byte
|
var names [][]byte
|
||||||
for _, m := range i.measurements {
|
for _, m := range i.measurements {
|
||||||
var matched bool
|
var matched bool
|
||||||
|
@ -490,17 +492,16 @@ func (i *Index) measurementNamesByNameFilter(op influxql.Token, val string, rege
|
||||||
matched = !regex.MatchString(m.Name)
|
matched = !regex.MatchString(m.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !matched {
|
if matched && m.Authorized(auth) {
|
||||||
continue
|
names = append(names, m.name)
|
||||||
}
|
}
|
||||||
names = append(names, []byte(m.Name))
|
|
||||||
}
|
}
|
||||||
bytesutil.Sort(names)
|
bytesutil.Sort(names)
|
||||||
return names
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
// measurementNamesByTagFilters returns the sorted measurements matching the filters on tag values.
|
// measurementNamesByTagFilters returns the sorted measurements matching the filters on tag values.
|
||||||
func (i *Index) measurementNamesByTagFilters(filter *TagFilter) [][]byte {
|
func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagFilter) [][]byte {
|
||||||
// Build a list of measurements matching the filters.
|
// Build a list of measurements matching the filters.
|
||||||
var names [][]byte
|
var names [][]byte
|
||||||
var tagMatch bool
|
var tagMatch bool
|
||||||
|
@ -541,9 +542,8 @@ func (i *Index) measurementNamesByTagFilters(filter *TagFilter) [][]byte {
|
||||||
// True | False | False
|
// True | False | False
|
||||||
// False | True | False
|
// False | True | False
|
||||||
// False | False | True
|
// False | False | True
|
||||||
if tagMatch == (filter.Op == influxql.EQ || filter.Op == influxql.EQREGEX) {
|
if tagMatch == (filter.Op == influxql.EQ || filter.Op == influxql.EQREGEX) && m.Authorized(auth) {
|
||||||
names = append(names, []byte(m.Name))
|
names = append(names, []byte(m.Name))
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,28 @@ func NewMeasurement(database, name string) *Measurement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Authorized determines if this Measurement is authorized to be read, according
|
||||||
|
// to the provided Authorizer. A measurement is authorized to be read if at
|
||||||
|
// least one series from the measurement is authorized to be read.
|
||||||
|
func (m *Measurement) Authorized(auth query.Authorizer) bool {
|
||||||
|
if auth == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note(edd): the cost of this check scales linearly with the number of series
|
||||||
|
// belonging to a measurement, which means it may become expensive when there
|
||||||
|
// are large numbers of series on a measurement.
|
||||||
|
//
|
||||||
|
// In the future we might want to push the set of series down into the
|
||||||
|
// authorizer, but that will require an API change.
|
||||||
|
for _, s := range m.SeriesByIDMap() {
|
||||||
|
if auth.AuthorizeSeriesRead(m.database, m.name, s.tags) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Measurement) HasField(name string) bool {
|
func (m *Measurement) HasField(name string) bool {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
_, hasField := m.fieldNames[name]
|
_, hasField := m.fieldNames[name]
|
||||||
|
|
|
@ -39,9 +39,9 @@ func NewFileSet(database string, levels []CompactionLevel, files []File) (*FileS
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes all the files in the file set.
|
// Close closes all the files in the file set.
|
||||||
func (p FileSet) Close() error {
|
func (fs FileSet) Close() error {
|
||||||
var err error
|
var err error
|
||||||
for _, f := range p.files {
|
for _, f := range fs.files {
|
||||||
if e := f.Close(); e != nil && err == nil {
|
if e := f.Close(); e != nil && err == nil {
|
||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
|
@ -535,10 +535,10 @@ func (fs *FileSet) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
func (fs *FileSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||||
// Return filtered list if expression exists.
|
// Return filtered list if expression exists.
|
||||||
if expr != nil {
|
if expr != nil {
|
||||||
return fs.measurementNamesByExpr(expr)
|
return fs.measurementNamesByExpr(auth, expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
itr := fs.MeasurementIterator()
|
itr := fs.MeasurementIterator()
|
||||||
|
@ -549,12 +549,14 @@ func (fs *FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
||||||
// Iterate over all measurements if no condition exists.
|
// Iterate over all measurements if no condition exists.
|
||||||
var names [][]byte
|
var names [][]byte
|
||||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||||
names = append(names, e.Name())
|
if fs.measurementAuthorized(auth, e.Name()) {
|
||||||
|
names = append(names, e.Name())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return names, nil
|
return names, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
func (fs *FileSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||||
if expr == nil {
|
if expr == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -587,19 +589,19 @@ func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
||||||
|
|
||||||
// Match on name, if specified.
|
// Match on name, if specified.
|
||||||
if tag.Val == "_name" {
|
if tag.Val == "_name" {
|
||||||
return fs.measurementNamesByNameFilter(e.Op, value, regex), nil
|
return fs.measurementNamesByNameFilter(auth, e.Op, value, regex), nil
|
||||||
} else if influxql.IsSystemName(tag.Val) {
|
} else if influxql.IsSystemName(tag.Val) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return fs.measurementNamesByTagFilter(e.Op, tag.Val, value, regex), nil
|
return fs.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex), nil
|
||||||
|
|
||||||
case influxql.OR, influxql.AND:
|
case influxql.OR, influxql.AND:
|
||||||
lhs, err := fs.measurementNamesByExpr(e.LHS)
|
lhs, err := fs.measurementNamesByExpr(auth, e.LHS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rhs, err := fs.measurementNamesByExpr(e.RHS)
|
rhs, err := fs.measurementNamesByExpr(auth, e.RHS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -614,14 +616,14 @@ func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
case *influxql.ParenExpr:
|
case *influxql.ParenExpr:
|
||||||
return fs.measurementNamesByExpr(e.Expr)
|
return fs.measurementNamesByExpr(auth, e.Expr)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("%#v", expr)
|
return nil, fmt.Errorf("%#v", expr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// measurementNamesByNameFilter returns matching measurement names in sorted order.
|
// measurementNamesByNameFilter returns matching measurement names in sorted order.
|
||||||
func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
|
func (fs *FileSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
|
||||||
itr := fs.MeasurementIterator()
|
itr := fs.MeasurementIterator()
|
||||||
if itr == nil {
|
if itr == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -641,7 +643,7 @@ func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, r
|
||||||
matched = !regex.Match(e.Name())
|
matched = !regex.Match(e.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
if matched {
|
if matched && fs.measurementAuthorized(auth, e.Name()) {
|
||||||
names = append(names, e.Name())
|
names = append(names, e.Name())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -649,7 +651,7 @@ func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, r
|
||||||
return names
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) [][]byte {
|
func (fs *FileSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) [][]byte {
|
||||||
var names [][]byte
|
var names [][]byte
|
||||||
|
|
||||||
mitr := fs.MeasurementIterator()
|
mitr := fs.MeasurementIterator()
|
||||||
|
@ -687,7 +689,7 @@ func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val strin
|
||||||
// True | False | False
|
// True | False | False
|
||||||
// False | True | False
|
// False | True | False
|
||||||
// False | False | True
|
// False | False | True
|
||||||
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) {
|
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && fs.measurementAuthorized(auth, me.Name()) {
|
||||||
names = append(names, me.Name())
|
names = append(names, me.Name())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -697,6 +699,23 @@ func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val strin
|
||||||
return names
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// measurementAuthorized determines if the measurement is authorized to be read.
|
||||||
|
// A measurment is authorised to be read if at least one of the measurement's
|
||||||
|
// series is authorised to be read.
|
||||||
|
func (fs *FileSet) measurementAuthorized(auth query.Authorizer, name []byte) bool {
|
||||||
|
if auth == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
sitr := fs.MeasurementSeriesIterator(name)
|
||||||
|
for series := sitr.Next(); series != nil; series = sitr.Next() {
|
||||||
|
if auth.AuthorizeSeriesRead(fs.database, name, series.Tags()) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// HasSeries returns true if the series exists and is not tombstoned.
|
// HasSeries returns true if the series exists and is not tombstoned.
|
||||||
func (fs *FileSet) HasSeries(name []byte, tags models.Tags, buf []byte) bool {
|
func (fs *FileSet) HasSeries(name []byte, tags models.Tags, buf []byte) bool {
|
||||||
for _, f := range fs.files {
|
for _, f := range fs.files {
|
||||||
|
|
|
@ -400,11 +400,11 @@ func (i *Index) MeasurementExists(name []byte) (bool, error) {
|
||||||
return m != nil && !m.Deleted(), nil
|
return m != nil && !m.Deleted(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||||
fs := i.RetainFileSet()
|
fs := i.RetainFileSet()
|
||||||
defer fs.Release()
|
defer fs.Release()
|
||||||
|
|
||||||
names, err := fs.MeasurementNamesByExpr(expr)
|
names, err := fs.MeasurementNamesByExpr(auth, expr)
|
||||||
|
|
||||||
// Clone byte slices since they will be used after the fileset is released.
|
// Clone byte slices since they will be used after the fileset is released.
|
||||||
return bytesutil.CloneSlice(names), err
|
return bytesutil.CloneSlice(names), err
|
||||||
|
|
|
@ -139,7 +139,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) {
|
||||||
// Retrieve measurements by expression
|
// Retrieve measurements by expression
|
||||||
idx.Run(t, func(t *testing.T) {
|
idx.Run(t, func(t *testing.T) {
|
||||||
t.Run("EQ", func(t *testing.T) {
|
t.Run("EQ", func(t *testing.T) {
|
||||||
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region = 'west'`))
|
names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`region = 'west'`))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
|
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
|
||||||
|
@ -148,7 +148,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("NEQ", func(t *testing.T) {
|
t.Run("NEQ", func(t *testing.T) {
|
||||||
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region != 'east'`))
|
names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`region != 'east'`))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if !reflect.DeepEqual(names, [][]byte{[]byte("disk"), []byte("mem")}) {
|
} else if !reflect.DeepEqual(names, [][]byte{[]byte("disk"), []byte("mem")}) {
|
||||||
|
@ -157,7 +157,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("EQREGEX", func(t *testing.T) {
|
t.Run("EQREGEX", func(t *testing.T) {
|
||||||
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region =~ /east|west/`))
|
names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`region =~ /east|west/`))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
|
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
|
||||||
|
@ -166,7 +166,7 @@ func TestIndex_MeasurementNamesByExpr(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("NEQREGEX", func(t *testing.T) {
|
t.Run("NEQREGEX", func(t *testing.T) {
|
||||||
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`country !~ /^u/`))
|
names, err := idx.MeasurementNamesByExpr(nil, influxql.MustParseExpr(`country !~ /^u/`))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("disk")}) {
|
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("disk")}) {
|
||||||
|
|
|
@ -741,12 +741,12 @@ func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
|
||||||
|
|
||||||
// MeasurementNamesByExpr returns names of measurements matching the condition.
|
// MeasurementNamesByExpr returns names of measurements matching the condition.
|
||||||
// If cond is nil then all measurement names are returned.
|
// If cond is nil then all measurement names are returned.
|
||||||
func (s *Shard) MeasurementNamesByExpr(cond influxql.Expr) ([][]byte, error) {
|
func (s *Shard) MeasurementNamesByExpr(auth query.Authorizer, cond influxql.Expr) ([][]byte, error) {
|
||||||
engine, err := s.engine()
|
engine, err := s.engine()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return engine.MeasurementNamesByExpr(cond)
|
return engine.MeasurementNamesByExpr(auth, cond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MeasurementNamesByRegex returns names of measurements matching the regular expression.
|
// MeasurementNamesByRegex returns names of measurements matching the regular expression.
|
||||||
|
@ -1595,7 +1595,9 @@ func NewFieldKeysIterator(engine Engine, opt query.IteratorOptions) (query.Itera
|
||||||
itr := &fieldKeysIterator{engine: engine}
|
itr := &fieldKeysIterator{engine: engine}
|
||||||
|
|
||||||
// Retrieve measurements from shard. Filter if condition specified.
|
// Retrieve measurements from shard. Filter if condition specified.
|
||||||
names, err := engine.MeasurementNamesByExpr(opt.Condition)
|
//
|
||||||
|
// FGA is currently not supported when retrieving field keys.
|
||||||
|
names, err := engine.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1685,7 +1687,7 @@ type measurementKeyFunc func(name []byte) ([][]byte, error)
|
||||||
|
|
||||||
func newMeasurementKeysIterator(engine Engine, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) {
|
func newMeasurementKeysIterator(engine Engine, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) {
|
||||||
itr := &measurementKeysIterator{fn: fn}
|
itr := &measurementKeysIterator{fn: fn}
|
||||||
names, err := engine.MeasurementNamesByExpr(opt.Condition)
|
names, err := engine.MeasurementNamesByExpr(opt.Authorizer, opt.Condition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -956,7 +956,7 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
|
||||||
// MeasurementNames returns a slice of all measurements. Measurements accepts an
|
// MeasurementNames returns a slice of all measurements. Measurements accepts an
|
||||||
// optional condition expression. If cond is nil, then all measurements for the
|
// optional condition expression. If cond is nil, then all measurements for the
|
||||||
// database will be returned.
|
// database will be returned.
|
||||||
func (s *Store) MeasurementNames(database string, cond influxql.Expr) ([][]byte, error) {
|
func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
shards := s.filterShards(byDatabase(database))
|
shards := s.filterShards(byDatabase(database))
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
|
@ -974,7 +974,7 @@ func (s *Store) MeasurementNames(database string, cond influxql.Expr) ([][]byte,
|
||||||
set := make(map[string]struct{})
|
set := make(map[string]struct{})
|
||||||
var names [][]byte
|
var names [][]byte
|
||||||
for _, sh := range shards {
|
for _, sh := range shards {
|
||||||
a, err := sh.MeasurementNamesByExpr(cond)
|
a, err := sh.MeasurementNamesByExpr(auth, cond)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1074,7 +1074,9 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
|
||||||
// Determine list of measurements.
|
// Determine list of measurements.
|
||||||
nameSet := make(map[string]struct{})
|
nameSet := make(map[string]struct{})
|
||||||
for _, sh := range shards {
|
for _, sh := range shards {
|
||||||
names, err := sh.MeasurementNamesByExpr(measurementExpr)
|
// Checking for authorisation can be done later on, when non-matching
|
||||||
|
// series might have been filtered out based on other conditions.
|
||||||
|
names, err := sh.MeasurementNamesByExpr(nil, measurementExpr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1228,7 +1230,9 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
|
||||||
var maxMeasurements int // Hint as to lower bound on number of measurements.
|
var maxMeasurements int // Hint as to lower bound on number of measurements.
|
||||||
for _, sh := range shards {
|
for _, sh := range shards {
|
||||||
// names will be sorted by MeasurementNamesByExpr.
|
// names will be sorted by MeasurementNamesByExpr.
|
||||||
names, err := sh.MeasurementNamesByExpr(measurementExpr)
|
// Authorisation can be done later one, when series may have been filtered
|
||||||
|
// out by other conditions.
|
||||||
|
names, err := sh.MeasurementNamesByExpr(nil, measurementExpr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1495,7 +1499,7 @@ func (s *Store) monitorShards() {
|
||||||
// inmem shards share the same index instance so just use the first one to avoid
|
// inmem shards share the same index instance so just use the first one to avoid
|
||||||
// allocating the same measurements repeatedly
|
// allocating the same measurements repeatedly
|
||||||
first := shards[0]
|
first := shards[0]
|
||||||
names, err := first.MeasurementNamesByExpr(nil)
|
names, err := first.MeasurementNamesByExpr(nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
|
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -514,7 +514,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
|
||||||
`cpu value=3 20`,
|
`cpu value=3 20`,
|
||||||
)
|
)
|
||||||
|
|
||||||
meas, err := s.MeasurementNames("db0", nil)
|
meas, err := s.MeasurementNames(query.OpenAuthorizer, "db0", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error with MeasurementNames: %v", err)
|
t.Fatalf("unexpected error with MeasurementNames: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -555,7 +555,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all the series for each measurement.
|
// Delete all the series for each measurement.
|
||||||
mnames, err := store.MeasurementNames("db", nil)
|
mnames, err := store.MeasurementNames(query.OpenAuthorizer, "db", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -960,6 +960,64 @@ func TestStore_TagValues(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStore_Measurements_Auth(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
test := func(index string) error {
|
||||||
|
s := MustOpenStore(index)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
// Create shard #0 with data.
|
||||||
|
s.MustCreateShardWithData("db0", "rp0", 0,
|
||||||
|
`cpu,host=serverA value=1 0`,
|
||||||
|
`cpu,host=serverA value=2 10`,
|
||||||
|
`cpu,region=west value=3 20`,
|
||||||
|
`cpu,secret=foo value=5 30`, // cpu still readable because it has other series that can be read.
|
||||||
|
`mem,secret=foo value=1 30`,
|
||||||
|
`disk value=4 30`,
|
||||||
|
)
|
||||||
|
|
||||||
|
authorizer := &internal.AuthorizerMock{
|
||||||
|
AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool {
|
||||||
|
if database == "" || tags.GetString("secret") != "" {
|
||||||
|
t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
names, err := s.MeasurementNames(authorizer, "db0", nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// names should not contain any measurements where none of the associated
|
||||||
|
// series are authorised for reads.
|
||||||
|
expNames := 2
|
||||||
|
var gotNames int
|
||||||
|
for _, name := range names {
|
||||||
|
if string(name) == "mem" {
|
||||||
|
return fmt.Errorf("got measurment %q but it should be filtered.", name)
|
||||||
|
}
|
||||||
|
gotNames++
|
||||||
|
}
|
||||||
|
|
||||||
|
if gotNames != expNames {
|
||||||
|
return fmt.Errorf("got %d measurements, but expected %d", gotNames, expNames)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, index := range tsdb.RegisteredIndexes() {
|
||||||
|
t.Run(index, func(t *testing.T) {
|
||||||
|
if err := test(index); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStore_TagKeys_Auth(t *testing.T) {
|
func TestStore_TagKeys_Auth(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -990,7 +1048,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// values should not contain any tag values associated with a series containing
|
// keys should not contain any tag keys associated with a series containing
|
||||||
// a secret tag.
|
// a secret tag.
|
||||||
expKeys := 3
|
expKeys := 3
|
||||||
var gotKeys int
|
var gotKeys int
|
||||||
|
|
Loading…
Reference in New Issue