auth: add series auth to 'show tag values'
parent
e0cba4477c
commit
1443b22379
|
@ -172,7 +172,7 @@ func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) {
|
|||
}
|
||||
}
|
||||
|
||||
fs, err := tsi1.NewFileSet(nil, files)
|
||||
fs, err := tsi1.NewFileSet("", nil, files)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -833,7 +833,7 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
|
|||
return ErrDatabaseNameRequired
|
||||
}
|
||||
|
||||
tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition)
|
||||
tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, q.Database, q.Condition)
|
||||
if err != nil {
|
||||
return ctx.Send(&query.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
|
@ -1139,7 +1139,7 @@ type TSDBStore interface {
|
|||
DeleteShard(id uint64) error
|
||||
|
||||
MeasurementNames(database string, cond influxql.Expr) ([][]byte, error)
|
||||
TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error)
|
||||
TagValues(auth query.Authorizer, database string, cond influxql.Expr) ([]tsdb.TagValues, error)
|
||||
}
|
||||
|
||||
var _ TSDBStore = LocalTSDBStore{}
|
||||
|
|
|
@ -377,7 +377,7 @@ func (s *TSDBStore) MeasurementNames(database string, cond influxql.Expr) ([][]b
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *TSDBStore) TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error) {
|
||||
func (s *TSDBStore) TagValues(_ query.Authorizer, database string, cond influxql.Expr) ([]tsdb.TagValues, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ func TestConcurrentServer_TagValues(t *testing.T) {
|
|||
if !ok {
|
||||
t.Fatal("Not a local server")
|
||||
}
|
||||
srv.TSDBStore.TagValues("db0", cond)
|
||||
srv.TSDBStore.TagValues(nil, "db0", cond)
|
||||
}
|
||||
|
||||
var f3 = func() { s.DropDatabase("db0") }
|
||||
|
|
|
@ -65,7 +65,7 @@ type Engine interface {
|
|||
// TagKeys(name []byte) ([][]byte, error)
|
||||
HasTagKey(name, key []byte) (bool, error)
|
||||
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
||||
MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||
TagKeyCardinality(name, key []byte) int
|
||||
|
||||
|
|
|
@ -360,8 +360,8 @@ func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[
|
|||
// for the earliest tag k will be available in index 0 of the returned values
|
||||
// slice.
|
||||
//
|
||||
func (e *Engine) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
return e.index.MeasurementTagKeyValuesByExpr(name, keys, expr, keysSorted)
|
||||
func (e *Engine) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
return e.index.MeasurementTagKeyValuesByExpr(auth, name, keys, expr, keysSorted)
|
||||
}
|
||||
|
||||
func (e *Engine) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
|
||||
|
|
|
@ -36,7 +36,7 @@ type Index interface {
|
|||
HasTagKey(name, key []byte) (bool, error)
|
||||
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
|
||||
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
|
||||
MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
|
||||
|
||||
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
|
||||
TagKeyCardinality(name, key []byte) int
|
||||
|
|
|
@ -277,7 +277,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
|
|||
//
|
||||
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
|
||||
// method.
|
||||
func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
i.mu.RLock()
|
||||
mm := i.measurements[string(name)]
|
||||
i.mu.RUnlock()
|
||||
|
@ -296,7 +296,7 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i
|
|||
ids, _, _ := mm.WalkWhereForSeriesIds(expr)
|
||||
if ids.Len() == 0 && expr == nil {
|
||||
for ki, key := range keys {
|
||||
values := mm.TagValues(key)
|
||||
values := mm.TagValues(auth, key)
|
||||
sort.Sort(sort.StringSlice(values))
|
||||
results[ki] = values
|
||||
}
|
||||
|
@ -323,6 +323,9 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i
|
|||
if s == nil {
|
||||
continue
|
||||
}
|
||||
if auth != nil && !auth.AuthorizeSeriesRead(i.database, s.Measurement().name, s.Tags()) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Iterate the tag keys we're interested in and collect values
|
||||
// from this series, if they exist.
|
||||
|
|
|
@ -1455,12 +1455,26 @@ func (m *Measurement) TagKeys() []string {
|
|||
}
|
||||
|
||||
// TagValues returns all the values for the given tag key, in an arbitrary order.
|
||||
func (m *Measurement) TagValues(key string) []string {
|
||||
func (m *Measurement) TagValues(auth query.Authorizer, key string) []string {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
values := make([]string, 0, len(m.seriesByTagKeyValue[key]))
|
||||
for v := range m.seriesByTagKeyValue[key] {
|
||||
values = append(values, v)
|
||||
VALUES:
|
||||
for v, series := range m.seriesByTagKeyValue[key] {
|
||||
if auth == nil {
|
||||
values = append(values, v)
|
||||
} else {
|
||||
for _, sid := range series {
|
||||
s := m.seriesByID[sid]
|
||||
if s == nil {
|
||||
continue
|
||||
}
|
||||
if auth.AuthorizeSeriesRead(m.database, m.name, s.Tags()) {
|
||||
values = append(values, v)
|
||||
continue VALUES
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
|
|
@ -12,22 +12,25 @@ import (
|
|||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/estimator/hll"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// FileSet represents a collection of files.
|
||||
type FileSet struct {
|
||||
levels []CompactionLevel
|
||||
files []File
|
||||
filters []*bloom.Filter // per-level filters
|
||||
levels []CompactionLevel
|
||||
files []File
|
||||
filters []*bloom.Filter // per-level filters
|
||||
database string
|
||||
}
|
||||
|
||||
// NewFileSet returns a new instance of FileSet.
|
||||
func NewFileSet(levels []CompactionLevel, files []File) (*FileSet, error) {
|
||||
func NewFileSet(database string, levels []CompactionLevel, files []File) (*FileSet, error) {
|
||||
fs := &FileSet{
|
||||
levels: levels,
|
||||
files: files,
|
||||
filters: make([]*bloom.Filter, len(levels)),
|
||||
levels: levels,
|
||||
files: files,
|
||||
filters: make([]*bloom.Filter, len(levels)),
|
||||
database: database,
|
||||
}
|
||||
if err := fs.buildFilters(); err != nil {
|
||||
return nil, err
|
||||
|
@ -312,7 +315,7 @@ func (fs *FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma
|
|||
//
|
||||
// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending
|
||||
// lexicographic order.
|
||||
func (fs *FileSet) tagValuesByKeyAndExpr(name []byte, keys []string, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) ([]map[string]struct{}, error) {
|
||||
func (fs *FileSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) ([]map[string]struct{}, error) {
|
||||
itr, err := fs.seriesByExprIterator(name, expr, fieldset.Fields(string(name)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -337,6 +340,9 @@ func (fs *FileSet) tagValuesByKeyAndExpr(name []byte, keys []string, expr influx
|
|||
|
||||
// Iterate all series to collect tag values.
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
if auth != nil && !auth.AuthorizeSeriesRead(fs.database, e.Name(), e.Tags()) {
|
||||
continue
|
||||
}
|
||||
for _, t := range e.Tags() {
|
||||
if idx, ok := keyIdxs[string(t.Key)]; ok {
|
||||
resultSet[idx][string(t.Value)] = struct{}{}
|
||||
|
|
|
@ -184,7 +184,7 @@ func (i *Index) Open() error {
|
|||
files = append(files, f)
|
||||
}
|
||||
}
|
||||
fs, err := NewFileSet(i.levels, files)
|
||||
fs, err := NewFileSet(i.Database, i.levels, files)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -639,7 +639,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
|
|||
//
|
||||
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
|
||||
// method.
|
||||
func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
fs := i.RetainFileSet()
|
||||
defer fs.Release()
|
||||
|
||||
|
@ -658,8 +658,20 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i
|
|||
if expr == nil {
|
||||
for ki, key := range keys {
|
||||
itr := fs.TagValueIterator(name, []byte(key))
|
||||
for val := itr.Next(); val != nil; val = itr.Next() {
|
||||
results[ki] = append(results[ki], string(val.Value()))
|
||||
if auth != nil {
|
||||
for val := itr.Next(); val != nil; val = itr.Next() {
|
||||
si := fs.TagValueSeriesIterator(name, []byte(key), val.Value())
|
||||
for se := si.Next(); se != nil; se = si.Next() {
|
||||
if auth.AuthorizeSeriesRead(i.Database, se.Name(), se.Tags()) {
|
||||
results[ki] = append(results[ki], string(val.Value()))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for val := itr.Next(); val != nil; val = itr.Next() {
|
||||
results[ki] = append(results[ki], string(val.Value()))
|
||||
}
|
||||
}
|
||||
}
|
||||
return results, nil
|
||||
|
@ -668,7 +680,7 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i
|
|||
// This is the case where we have filtered series by some WHERE condition.
|
||||
// We only care about the tag values for the keys given the
|
||||
// filtered set of series ids.
|
||||
resultSet, err := fs.tagValuesByKeyAndExpr(name, keys, expr, i.fieldset)
|
||||
resultSet, err := fs.tagValuesByKeyAndExpr(auth, name, keys, expr, i.fieldset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -775,12 +775,12 @@ func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
|
|||
|
||||
// MeasurementTagKeyValuesByExpr returns all the tag keys values for the
|
||||
// provided expression.
|
||||
func (s *Shard) MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
|
||||
engine, err := s.engine()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return engine.MeasurementTagKeyValuesByExpr(name, key, expr, keysSorted)
|
||||
return engine.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted)
|
||||
}
|
||||
|
||||
// MeasurementFields returns fields for a measurement.
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
"github.com/influxdata/influxdb/pkg/estimator"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/uber-go/zap"
|
||||
)
|
||||
|
||||
|
@ -1025,7 +1026,7 @@ func (a tagValuesSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|||
func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
|
||||
|
||||
// TagValues returns the tag keys and values in the given database, matching the condition.
|
||||
func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error) {
|
||||
func (s *Store) TagValues(auth query.Authorizer, database string, cond influxql.Expr) ([]TagValues, error) {
|
||||
if cond == nil {
|
||||
return nil, errors.New("a condition is required")
|
||||
}
|
||||
|
@ -1121,10 +1122,28 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
|
|||
// get all the tag values for each key in the keyset.
|
||||
// Each slice in the results contains the sorted values associated
|
||||
// associated with each tag key for the measurement from the key set.
|
||||
if result.values, err = sh.MeasurementTagKeyValuesByExpr(name, result.keys, filterExpr, true); err != nil {
|
||||
if result.values, err = sh.MeasurementTagKeyValuesByExpr(auth, name, result.keys, filterExpr, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
allResults = append(allResults, result)
|
||||
|
||||
// remove any tag keys that didn't have any authorized values
|
||||
j := 0
|
||||
for i := range result.keys {
|
||||
if len(result.values[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
result.keys[j] = result.keys[i]
|
||||
result.values[j] = result.values[i]
|
||||
j++
|
||||
}
|
||||
result.keys = result.keys[:j]
|
||||
result.values = result.values[:j]
|
||||
|
||||
// only include result if there are keys with values
|
||||
if len(result.keys) > 0 {
|
||||
allResults = append(allResults, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -938,7 +938,7 @@ func TestStore_TagValues(t *testing.T) {
|
|||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
setup(index)
|
||||
t.Run(example.Name+"_"+index, func(t *testing.T) {
|
||||
got, err := s.TagValues("db0", example.Expr)
|
||||
got, err := s.TagValues(nil, "db0", example.Expr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -1168,7 +1168,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
|
|||
}
|
||||
b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
if tvResult, err = s.TagValues("db0", condition); err != nil {
|
||||
if tvResult, err = s.TagValues(nil, "db0", condition); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue